-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathusers.py
140 lines (112 loc) · 4.07 KB
/
users.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import io
from datetime import datetime
from flask import Blueprint, send_file
from werkzeug.exceptions import BadRequest
from ..shared.auth import get_current_user, requires_auth
from ..shared.emails import new_user_registration
from ..shared.rest_utils import (
with_lookup,
marshal_response,
unmarshal_request,
use_args_with_pagination,
)
from ..models import (
Users,
UserSchema,
UserListSchema,
CIDCRole,
IntegrityError,
Permissions,
)
from ..config.settings import ENV
users_bp = Blueprint("users", __name__)
user_schema = UserSchema()
user_list_schema = UserListSchema()
new_user_schema = UserSchema(exclude=("approval_date", "role", "disabled"))
partial_user_schema = UserSchema(partial=True)
@users_bp.route("/self", methods=["GET"])
@requires_auth("self")
@marshal_response(user_schema)
def get_self():
"""Return the current user's information to them."""
return get_current_user()
@users_bp.route("/self", methods=["POST"])
@requires_auth("self")
@unmarshal_request(new_user_schema, "user")
@marshal_response(user_schema, 201)
def create_self(user):
"""
Allow the current user to create a profile for themself. On success,
send an email to the CIDC mailing list with a registration notification.
"""
current_user = get_current_user()
if current_user.email != user.email:
raise BadRequest(
f"{current_user.email} can't create a user with email {user.email}"
)
try:
user.insert()
except IntegrityError as e:
raise BadRequest(str(e.orig))
new_user_registration(user.email, send_email=True)
return user
@users_bp.route("/", methods=["POST"])
@requires_auth("users", [CIDCRole.ADMIN.value])
@unmarshal_request(user_schema, "user")
@marshal_response(user_schema, 201)
def create_user(user):
"""
Allow admins to create user records.
"""
try:
user.insert()
except IntegrityError as e:
raise BadRequest(str(e.orig))
return user
@users_bp.route("/", methods=["GET"])
@requires_auth("users", [CIDCRole.ADMIN.value])
@use_args_with_pagination({}, user_schema)
@marshal_response(user_list_schema)
def list_users(args, pagination_args):
"""
List all users. TODO: pagination support
"""
users = Users.list(**pagination_args)
count = Users.count()
return {"_items": users, "_meta": {"total": count}}
@users_bp.route("/<int:user>", methods=["GET"])
@requires_auth("users_item", [CIDCRole.ADMIN.value])
@with_lookup(Users, "user")
@marshal_response(user_schema)
def get_user(user: Users):
"""Get a single user by their id."""
# this is not user-input due to @with_lookup, so safe to return
return user
@users_bp.route("/<int:user>", methods=["PATCH"])
@requires_auth("users_item", [CIDCRole.ADMIN.value])
@with_lookup(Users, "user", check_etag=True)
@unmarshal_request(partial_user_schema, "user_updates", load_sqla=False)
@marshal_response(user_schema)
def update_user(user: Users, user_updates: Users):
"""Update a single user's information."""
# If a user is being awarded their first role, add an approval date
if not user.role and "role" in user_updates:
user_updates["approval_date"] = datetime.now()
# If this user is being re-enabled after being disabled, update their last
# access date to now so that they aren't disabled again tomorrow and
# refresh their IAM permissions.
if user.disabled and user_updates.get("disabled") == False:
user_updates["_accessed"] = datetime.now()
Permissions.grant_iam_permissions(user)
user.update(changes=user_updates)
# this is not user-input due to @with_lookup, so safe to return
return user
@users_bp.route("/data_access_report", methods=["GET"])
@requires_auth("users_data_access_report", [CIDCRole.ADMIN.value])
def get_data_access_report():
"""Generate the user data access report."""
buffer = io.BytesIO()
Users.get_data_access_report(buffer)
buffer.seek(0)
filename = f"cidc_{ENV}_data_access_{datetime.now().date()}.xlsx"
return send_file(buffer, as_attachment=True, attachment_filename=filename)