-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDB_basemodel_bcrypt(verification)_token(authentication).py
230 lines (175 loc) · 7.46 KB
/
DB_basemodel_bcrypt(verification)_token(authentication).py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
%%%%%%%%%%%%%%%%%%% auth.py %%%%%%%%%%%%%%%%%%%%%%%
*********************************** Part O *************************************
from fastapi import FastAPI
from pydantic import Bas_eModel
from passlib.context import CryptContext
bcrypt_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password):
return bcrypt_context.hash(password)
app = FastAPI()
class connection(BaseModel):
SBP: float
Rate: float
overShoot:float
target: str
@app.post('/post/connection')
async def create_a_connection(conn:connection):
a = conn.SBP
b = conn.Rate
c = conn.overShoot
d = get_password_hash(conn.target)
return [a,b,c,d]
############################################################ verify and authenticate users ##########################################################
*********************************** Part I *************************************
def verify_password(plain_password, hashed_password):
return bcrypt_context.verify(plain_password, hashed_password)
def authenticate_user(username: str, password: str, db):
user = db.query(models.Users)\
.filter(models.Users.username == username)\
.first()
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
from fastapi.security import OAuth2PasswordRequestForm
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)):
user = authenticate_user(form_data.username, form_data.password, db)
if not user:
raise HTTPException(status_code=404, detail = 'user not found')
return 'user validated'
*********************************** Part II *************************************
from fastapi.security import OAuth2PasswordBearer
from datetime import datetime, timedelta
from jose import jwt, JWTError
SECRET_KEY = "KlgH6AzYDeZeGwD288to79I3vTHT8wp7"
ALGORITHM = "HS256"
oauth2_bearer = OAuth2PasswordBearer(tokenUrl="token")
def create_access_token(username: str, user_id: int,
expires_delta: Optional[timedelta] = None):
encode = {"sub": username, "id": user_id}
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
encode.update({"exp": expire})
return jwt.encode(encode, SECRET_KEY, algorithm=ALGORITHM)
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)):
user = authenticate_user(form_data.username, form_data.password, db)
if not user:
raise HTTPException(status_code=404, detail = 'user not found')
token_expires = timedelta(minutes=20)
token = create_access_token(user.username,
user.id,
expires_delta=token_expires)
return {"token": token}
*********************************** Part III *************************************
from jose import JWTError
async def get_current_user(token: str = Depends(oauth2_bearer)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
user_id: int = payload.get("id")
if username is None or user_id is None:
raise get_user_exception() ## == raise HTTPException(status_code=404, detail = 'user not found')
return {"username": username, "id": user_id}
except JWTError:
raise get_user_exception() ## == raise HTTPException(status_code=404, detail = 'user not found')
# exception
from fastapi import status
def get_user_exception():
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return credentials_exception
def token_exception():
token_exception_response = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
return token_exception_response
%%%%%%%%%%%%%%%%%%%%% go back to main.py which interact with database (paste token in the header in postmant) %%%%%%%%%%%%%%%%%%%%%%
from auth import get_current_user, get_user_exception
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 3 required inputs
@app.get("/todo/{todo_id}")
async def read_todo(todo_id: int,
user: dict = Depends(get_current_user),
db: Session = Depends(get_db)):
if user is None:
raise get_user_exception()
todo_model = db.query(models.Todos)\
.filter(models.Todos.id == todo_id)\
.filter(models.Todos.owner_id == user.get("id"))\
.first()
if todo_model is not None:
return todo_model
raise http_exception()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 3 required inputs
@app.post("/")
async def create_todo(todo: Todo,
user: dict = Depends(get_current_user),
db: Session = Depends(get_db)):
if user is None:
raise get_user_exception()
todo_model = models.Todos()
todo_model.title = todo.title
todo_model.description = todo.description
todo_model.priority = todo.priority
todo_model.complete = todo.complete
todo_model.owner_id = user.get("id") #### get from token
db.add(todo_model)
db.commit()
return successful_response(201)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 4 required inputs
@app.put("/{todo_id}")
async def update_todo(todo_id: int,
todo: Todo, ### where is the update from (input in body-raw in postman)
user: dict = Depends(get_current_user),
db: Session = Depends(get_db)):
if user is None:
raise get_user_exception()
todo_model = db.query(models.Todos)\
.filter(models.Todos.id == todo_id)\
.filter(models.Todos.owner_id == user.get("id"))\
.first()
if todo_model is None:
raise http_exception()
todo_model.title = todo.title
todo_model.description = todo.description
todo_model.priority = todo.priority
todo_model.complete = todo.complete
db.add(todo_model)
db.commit()
return successful_response(200)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 3 required inputs
@app.delete("/{todo_id}")
async def delete_todo(todo_id: int,
user: dict = Depends(get_current_user), # you can delete ones only related to the identified user by token (based on JWT). If use todo_id that belongs to other users, will pop out error.
db: Session = Depends(get_db)):
if user is None:
raise get_user_exception()
todo_model = db.query(models.Todos)\
.filter(models.Todos.id == todo_id)\
.filter(models.Todos.owner_id == user.get("id"))\
.first()
if todo_model is None:
raise http_exception()
db.query(models.Todos)\
.filter(models.Todos.id == todo_id)\
.delete()
db.commit()
return successful_response(200)
def successful_response(status_code: int):
return {
'status': status_code,
'transaction': 'Successful'
}
def http_exception():
return HTTPException(status_code=404, detail="Todo not found")