-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
90 lines (73 loc) · 2.81 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import uvicorn
from fastapi import APIRouter, FastAPI
from fastapi.routing import APIRoute
from starlette.requests import Request
from elasticsearch import AsyncElasticsearch
from logging import getLogger, StreamHandler
from models import CreateUserRequest
logger = getLogger(__name__)
logger.addHandler(StreamHandler())
logger.setLevel("INFO")
MAPPING_FOR_INDEX = {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"surname": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"date_of_birth": {
"type": "date"
},
"interests": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
async def ping() -> dict:
return {"success": True}
async def create_index(request: Request) -> dict:
elastic_client: AsyncElasticsearch = request.app.state.elastic_client
await elastic_client.indices.create(index="users", mappings=MAPPING_FOR_INDEX)
return {"success": True}
async def delete_index(request: Request) -> dict:
elastic_client: AsyncElasticsearch = request.app.state.elastic_client
await elastic_client.indices.delete(index="users")
return {"success": True}
async def create_user(request: Request, body: CreateUserRequest) -> dict:
elastic_client: AsyncElasticsearch = request.app.state.elastic_client
res = await elastic_client.index(index="users", document=body.dict())
logger.info(res)
return {"success": True, "result": res}
async def get_all_users(request: Request):
elastic_client: AsyncElasticsearch = request.app.state.elastic_client
res = await elastic_client.search(index="users", query={"match_all": {}})
return {"success": True, "result": res}
routes = [
APIRoute(path="/ping", endpoint=ping, methods=["GET"]),
APIRoute(path="/create_index", endpoint=create_index, methods=["GET"]),
APIRoute(path="/delete_index", endpoint=delete_index, methods=["GET"]),
APIRoute(path="/create_user", endpoint=create_user, methods=["POST"]),
APIRoute(path="/get_all_users", endpoint=get_all_users, methods=["GET"])
]
elastic_client = AsyncElasticsearch()
app = FastAPI()
app.state.elastic_client = elastic_client
app.include_router(APIRouter(routes=routes))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)