-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapi.py
323 lines (247 loc) · 9.24 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
from fastapi import FastAPI, HTTPException, Request, Response
from fastapi.logger import logger
from pydantic import BaseModel
import requests
import os
import google_auth_oauthlib.flow
import logging
import db
import ai
import json
import flow
from starlette.middleware.sessions import SessionMiddleware
from constants import BASE_URL, GOOGLE_SCOPES
import FirebaseService
from dotenv import load_dotenv
from supabase.client import create_client
from terra.base_client import Terra
from supabase.client import create_client
from dotenv import load_dotenv
load_dotenv()
API_URL = os.getenv("API_URL")
API_KEY = os.getenv("API_KEY")
if not API_URL or not API_KEY:
raise Exception("Supabase API_URL or API_KEY not found in .env file")
supabase = create_client(API_URL, API_KEY)
google_oauth_client_config = {
"web": {
"client_id": os.environ.get("GOOGLE_CLIENT_ID"),
"project_id":"avon2-402806","auth_uri":"https://accounts.google.com/o/oauth2/auth",
"token_uri":"https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs",
"client_secret": os.environ.get("GOOGLE_CLIENT_SECRET"),
"redirect_uris":["https://avon-seven.vercel.app/google_oauth_callback"],
"javascript_origins":["http://localhost:8081"]
}
}
# Ensure that all requests include an 'example.com' or
# '*.example.com' host header, and strictly enforce https-only access.
# middleware = [
# # Middleware(
# # TrustedHostMiddleware,
# # allowed_hosts=['example.com', '*.example.com'],
# # ),
# # Middleware(HTTPSRedirectMiddleware),
# Middleware(SessionMiddleware(secret_key=environ.get('GOOGLE_CLIENT_SECRET')))
# ]
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key=os.environ.get("GOOGLE_CLIENT_SECRET"))
@app.on_event("startup")
async def startup_event():
global logger
logger = logging.getLogger("uvicorn.access")
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
logger.addHandler(handler)
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/ping")
async def ping():
return {"message": "pong"}
@app.get("/users/")
async def get_users():
return db.get_users()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
return db.get_user(user_id)
@app.get("/users/telegram/{telegram_user_id}")
async def get_user_by_telegram_id(telegram_user_id: str):
return db.get_user_by_telegram_id(telegram_user_id)
@app.get("/users/email/{email}")
async def get_user_by_email(email: str):
return db.get_user_by_email(email)
@app.put("/users")
async def add_user(
username: str,
name: str,
email: str,
google_refresh_token: str,
):
return db.add_user(username, name, email, google_refresh_token)
@app.patch("/users/{user_id}")
async def edit_user(
user_id: int,
username: str,
name: str,
email: str,
google_refresh_token: str,
):
return db.edit_user(user_id, username, name, email, google_refresh_token)
@app.delete("/users/{user_id}")
async def delete_user(user_id: int):
return db.delete_user(user_id)
@app.get("/tasks/")
async def get_tasks():
return db.get_tasks()
@app.get("/tasks/{user_id}")
async def get_tasks_from_user(user_id: int):
return db.get_tasks_from_user(user_id)
@app.put("/tasks")
async def add_task(
user_id: int, name: str, duration: str, deadline: str):
return db.add_task(user_id, name, int(duration or "0"), deadline)
@app.patch("/tasks/{task_id}")
async def edit_task(task_id: int, name: str, description: str = ""):
return db.edit_task(task_id, name, description)
@app.delete("/tasks/{task_id}")
async def delete_task(task_id: int):
return db.delete_task(task_id)
@app.post("/task/added/{task_id}")
async def mark_task_as_added(task_id: int):
return db.mark_task_as_added(task_id)
@app.patch("/planTasks/{user_id}")
def plan_tasks(user_id: int):
return ai.plan_tasks(user_id)
@app.get("/get_google_oauth_url")
async def get_google_oauth_url(request: Request):
"""
Route to get Google Oauth URL
params:
telegram_user_id: int
"""
params = request.query_params
telegram_user_id = params.get("telegram_user_id")
username = params.get("username")
if (not telegram_user_id or not telegram_user_id.isdigit()) or (not username):
raise Exception("Invalid telegram_user_id and/or username")
telegram_user_id = int(telegram_user_id)
state_dict = {
"telegram_user_id": telegram_user_id,
"username": username,
}
state_dict_dumps = json.dumps(state_dict)
return get_google_login_url(state_dict_str=state_dict_dumps)
@app.post("/google_oauth_callback")
async def google_oauth_callback(request: Request, response: Response):
"""
Route to handle Google Oauth callback after user has logged in
"""
state, username, telegram_user_id = None, None, None
if "state" in request.query_params:
state_dict_dumps = request.query_params.get("state")
if not state_dict_dumps:
raise Exception("state not in session")
state_dict = json.loads(state_dict_dumps)
telegram_user_id = state_dict.get("telegram_user_id", "")
username = state_dict.get("username", "")
else:
raise Exception("state not in session")
flow = google_auth_oauthlib.flow.Flow.from_client_config(
client_config=get_google_oauth_client_config(),
scopes=GOOGLE_SCOPES,
state=state,
)
flow.redirect_uri = BASE_URL + "/google_oauth_callback"
code = request.query_params.get("code")
flow.fetch_token(code=code)
# Store the credentials in the session.
credentials = flow.credentials
request.session["credentials"] = {
"token": credentials.token,
"refresh_token": credentials.refresh_token,
# 'token_uri': credentials.token_uri,
"scopes": credentials.scopes,
}
# TODO - save user with refresh_token to db
logger.info("Done with Google Oauth!")
# Create a Response object with the 307 redirect status code
response = Response(status_code=307, headers={"Location": TELEGRAM_BOT_LINK})
# Send the Response object
return response
TERRA_API_KEY = os.getenv("TERRA_API_KEY")
TERRA_DEV_ID = os.getenv("TERRA_DEV_ID")
TERRA_SIGNING_SECRET = os.getenv("TERRA_SIGNING_SECRET")
if not TERRA_API_KEY or not TERRA_DEV_ID:
raise Exception(
"TERRA_API_KEY or TERRA_DEV_ID or TERRA_SIGNING_SECRET not found in .env file"
)
terra = Terra(
api_key=TERRA_API_KEY, dev_id=TERRA_DEV_ID, secret=TERRA_SIGNING_SECRET or ""
)
@app.post("/generateWidgetSession")
async def generate_widget_session(user_id: int):
widget_response = terra.generate_widget_session(
reference_id=user_id,
providers=["GARMIN"],
auth_success_redirect_url="https://avon-seven.vercel.app/widgetResponseSuccess",
auth_failure_redirect_url="https://avon-seven.vercel.app/widgetResponseFailure",
language="en",
).get_parsed_response()
return {"widget_response": widget_response}
@app.get("/widgetResponseSuccess")
async def handle_widget_response_success(
user_id: str, reference_id: str, resource: str
):
db.edit_user(reference_id, terra_user_id=user_id)
return {"response": "close the browser window"}
@app.get("/widgetResponseFailure")
async def handle_widget_response_failure(
user_id: str, resource: str, reference_id: str, lan: str, reason: str
):
# TODO: Handle failure
db.edit_user(reference_id, terra_user_id=user_id)
return {"response": "close the browser window"}
@app.get("/listTerraUsers")
async def list_terra_users():
return {"users": terra.list_users().get_parsed_response()}
@app.post("/consumeTerraWebhook")
async def consume_terra_webhook(request: Request):
body = await request.body()
body_dict = json.loads(body.decode())
print(body_dict)
return {"user": body_dict.get("user", {}).get("user_id"), "type": body_dict["type"], "data": body_dict["data"]}
# for testing purposes only
@app.post("/message/{user_id}")
def message(user_id: int, message: str):
return db.message(user_id, message)
class TokenData(BaseModel):
userId: str
token: str
@app.post("/registerPushToken")
async def register_push_token(request: TokenData):
user_id = str(request.userId)
token = str(request.token)
try:
await FirebaseService.save_token(user_id, token)
return {"status": "success"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/send_notification/{user_id}")
async def send_notification(user_id: int, token: str, message: str):
email = db.get_user(user_id).get("email")
url = "https://exp.host/--/api/v2/push/send"
data = {
"to": token,
"title": "Avon Notification",
"body": message
}
response = requests.post(url, json=data)
FirebaseService.write_chat_message(email, message, None)
return {"status": "Notification sent", "response": response.json()}
@app.patch("/reply/{todo_id}")
async def reply(todo_id: int, message: str):
return flow.reply_flow(todo_id, message)
@app.post("/reply")
async def add_new_task(message: str, email: str):
return flow.add_new_task_flow(message, email)