-
Notifications
You must be signed in to change notification settings - Fork 237
/
Copy pathmain.py
201 lines (170 loc) · 5.86 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
from fastapi import (
HTTPException,
FastAPI,
Depends,
)
import requests
import logging
import asyncio
import httpx
import yaml
import sys
import os
# ---------
# Constants
# ---------
CREDENTIALS_READY = False
RETRY_LIMIT = 10
RETRY_INTERVAL = 15
# ----------------
# Environment vars
# ----------------
NGROK_HOST = os.getenv("NGROK_HOST", "ngrok")
NGROK_PORT = os.getenv("NGROK_PORT", 4040)
NGROK_INTERNAL_WEBHOOK_HOST = os.getenv("NGROK_INTERNAL_WEBHOOK_HOST", "rasa-core")
NGROK_INTERNAL_WEBHOOK_PORT = os.getenv("NGROK_INTERNAL_WEBHOOK_PORT", 5005)
NGROK_API_URL = f"http://{NGROK_HOST}:{NGROK_PORT}"
TELEGRAM_ACCESS_TOKEN = os.getenv("TELEGRAM_ACCESS_TOKEN", None)
TELEGRAM_BOTNAME = os.getenv("TELEGRAM_BOTNAME", None)
CREDENTIALS_PATH = os.getenv("CREDENTIALS_PATH", "/app/rasa/credentials.yml")
# -------
# Logging
# -------
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.debug(
f"NGROK_HOST: {NGROK_HOST}:{NGROK_PORT}\nNGROK_API_URL: {NGROK_API_URL}\nNGROK_INTERNAL_WEBHOOK_HOST: {NGROK_INTERNAL_WEBHOOK_HOST}:{NGROK_INTERNAL_WEBHOOK_PORT}"
)
# ---------------------------------
# Wait for ngrok API to come online
# ---------------------------------
async def wait_for_ngrok_api():
while True:
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{NGROK_API_URL}/api/tunnels")
response.raise_for_status()
logger.debug("ngrok API is online.")
return True
except httpx.RequestError:
logger.debug("ngrok API is offline. Waiting...")
await asyncio.sleep(RETRY_INTERVAL)
# -------------------------------------
# Fetch list of active tunnels on ngrok
# -------------------------------------
async def get_active_tunnels():
try:
response = requests.get(f"{NGROK_API_URL}/api/tunnels")
response.raise_for_status()
tunnels = response.json()["tunnels"]
except requests.exceptions.HTTPError:
tunnels = []
return tunnels
# -----------------
# Stop ngrok tunnel
# -----------------
async def stop_tunnel(tunnel):
tunnel_id = tunnel["name"]
response = requests.delete(f"{NGROK_API_URL}/api/tunnels/{tunnel_id}")
response.raise_for_status()
# ----------------------
# Stop all ngrok tunnels
# ----------------------
async def stop_all_tunnels():
active_tunnels = await get_active_tunnels()
if not active_tunnels:
logger.debug("No active tunnels found.")
else:
for tunnel in active_tunnels:
logger.debug(f"Stopping tunnel: {tunnel['name']} ({tunnel['public_url']})")
await stop_tunnel(tunnel)
# -------------------------------------
# Get the first ngrok tunnel w/ retries
# -------------------------------------
async def get_tunnel(retry=0):
if retry > RETRY_LIMIT:
raise Exception(
f"Could not create ngrok tunnel. Exceed retry limit of {RETRY_LIMIT} attempts."
)
active_tunnels = await get_active_tunnels()
if len(active_tunnels) == 0:
logger.debug(f"No active tunnels found. Trying again in {RETRY_INTERVAL}s..")
await asyncio.sleep(RETRY_INTERVAL)
retry += 1
return await get_tunnel(retry=retry)
else:
return active_tunnels[0]["public_url"]
# -------------------
# Create ngrok tunnel
# -------------------
async def create_tunnel():
response = requests.post(
f"{NGROK_API_URL}/api/tunnels",
json={
"addr": f"{NGROK_INTERNAL_WEBHOOK_HOST}:{NGROK_INTERNAL_WEBHOOK_PORT}",
"proto": "http",
"name": NGROK_INTERNAL_WEBHOOK_HOST,
},
)
try:
response.raise_for_status()
return response.json()["public_url"]
except requests.exceptions.HTTPError as e:
logger.warning(f"Error creating ngrok tunnel: {e}")
return False
# ----------------------------
# Update Rasa credentials file
# ----------------------------
async def update_credentials_file(ngrok_url):
global CREDENTIALS_READY
try:
with open(CREDENTIALS_PATH, "r") as file:
credentials = yaml.safe_load(file)
credentials["custom_telegram.CustomTelegramInput"][
"webhook_url"
] = f"{ngrok_url}/webhooks/telegram/webhook"
credentials["custom_telegram.CustomTelegramInput"][
"access_token"
] = TELEGRAM_ACCESS_TOKEN
credentials["custom_telegram.CustomTelegramInput"]["verify"] = TELEGRAM_BOTNAME
with open(CREDENTIALS_PATH, "w") as file:
yaml.safe_dump(credentials, file)
CREDENTIALS_READY = True
except Exception as e:
logger.warning(f"Error updating {CREDENTIALS_PATH}: {e}")
sys.exit(1)
# -----------------
# FastAPI endpoints
# -----------------
app = FastAPI()
# -------------
# Startup event
# -------------
@app.on_event("startup")
async def startup_event():
env = os.getenv("ENV", None)
if env and env.lower() in ["dev", "development", "local"]:
await wait_for_ngrok_api()
url = await get_tunnel()
if not url:
logger.debug("No active tunnels found. Creating one...")
url = await create_tunnel()
logger.debug(f"Tunnel url: {url}")
await update_credentials_file(url)
else:
logger.debug("Not in dev environment. Skipping.")
# ---------------------
# Endpoint dependencies
# ---------------------
async def check_endpoint_availability():
if not CREDENTIALS_READY:
raise HTTPException(status_code=403, detail="Endpoint not available yet")
return True
# ---------------------
# Health check endpoint
# ---------------------
# This endpoint is used by docker-compose to check if the
# container is ready. If it is ready, Rasa core can start
@app.get("/", dependencies=[Depends(check_endpoint_availability)])
async def health_check():
return {"status": "ok"}