Skip to content

Commit

Permalink
resources: add support for user impersonation
Browse files Browse the repository at this point in the history
  • Loading branch information
lnielsen committed Oct 17, 2023
1 parent 12cbc46 commit a0332db
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 5 deletions.
1 change: 1 addition & 0 deletions invenio_users_resources/resources/users/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class UsersResourceConfig(RecordResourceConfig):
"block": "/<id>/block",
"restore": "/<id>/restore",
"deactivate": "/<id>/deactivate",
"impersonate": "/<id>/impersonate",
}

request_view_args = {
Expand Down
21 changes: 16 additions & 5 deletions invenio_users_resources/resources/users/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

"""Users resource."""


from flask import g, send_file
from flask_resources import resource_requestctx, response_handler, route
from flask_security import impersonate_user
from invenio_records_resources.resources import RecordResource
from invenio_records_resources.resources.records.resource import (
request_search_args,
Expand Down Expand Up @@ -42,6 +42,7 @@ def create_url_rules(self):
route("POST", routes["block"], self.block),
route("POST", routes["restore"], self.restore),
route("POST", routes["deactivate"], self.deactivate),
route("POST", routes["impersonate"], self.impersonate),
route("GET", routes["moderation_search"], self.search_all),
]

Expand Down Expand Up @@ -98,7 +99,7 @@ def avatar(self):

@request_view_args
def approve(self):
"""Read a user."""
"""Approve user."""
self.service.approve(
id_=resource_requestctx.view_args["id"],
identity=g.identity,
Expand All @@ -107,7 +108,7 @@ def approve(self):

@request_view_args
def block(self):
"""Read a user."""
"""Block user."""
self.service.block(
id_=resource_requestctx.view_args["id"],
identity=g.identity,
Expand All @@ -116,7 +117,7 @@ def block(self):

@request_view_args
def restore(self):
"""Read a user."""
"""Restore user."""
self.service.restore(
id_=resource_requestctx.view_args["id"],
identity=g.identity,
Expand All @@ -125,9 +126,19 @@ def restore(self):

@request_view_args
def deactivate(self):
"""Read a user."""
"""Deactive user."""
self.service.deactivate(
id_=resource_requestctx.view_args["id"],
identity=g.identity,
)
return "", 200

@request_view_args
def impersonate(self):
"""Impersonate the user."""
user = self.service.can_impersonate(
g.identity, resource_requestctx.view_args["id"]
)
if user:
impersonate_user(user, g.identity)
return "", 200
1 change: 1 addition & 0 deletions invenio_users_resources/services/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class UsersPermissionPolicy(BasePermissionPolicy):
can_manage = [UserManager, SystemProcess()]
can_search_all = [UserManager, SystemProcess()]
can_read_system_details = [UserManager, SystemProcess()]
can_impersonate = [UserManager, SystemProcess()]


class GroupsPermissionPolicy(BasePermissionPolicy):
Expand Down
9 changes: 9 additions & 0 deletions invenio_users_resources/services/users/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,12 @@ def deactivate(self, identity, id_, uow=None):

uow.register(RecordIndexOp(user, indexer=self.indexer, index_refresh=True))
return True

def can_impersonate(self, identity, id_):
"""Check permissions if a user can be impersonated."""
user = UserAggregate.get_record(id_)
if user is None:
# return 403 even on empty resource due to security implications
raise PermissionDeniedError()
self.require_permission(identity, "impersonate", record=user)
return user.model.model_obj
26 changes: 26 additions & 0 deletions tests/resources/test_resources_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,32 @@ def test_management_permissions(client, headers, user_pub, db):
assert res.status_code == 403


def test_impersonate_user(client, headers, user_pub, user_moderator, db):
"""Tests user impersonation endpoint."""
client = user_moderator.login(client)
res = client.get(f"/users/{user_moderator.id}")
assert res.status_code == 200
assert res.json["is_current_user"] == True
res = client.get(f"/users/{user_pub.id}")
assert res.status_code == 200
assert res.json["is_current_user"] == False

res = client.post(f"/users/{user_pub.id}/impersonate", headers=headers)
assert res.status_code == 200

res = client.get(f"/users/{user_pub.id}")
assert res.status_code == 200
assert res.json["is_current_user"] == True

res = client.get(f"/users/{user_moderator.id}")
assert res.status_code == 403

res = client.post(f"/users/{user_moderator.id}/impersonate", headers=headers)
assert res.status_code == 403
res = client.post(f"/users/{user_pub.id}/impersonate", headers=headers)
assert res.status_code == 403


# TODO: test conditional requests
# TODO: test caching headers
# TODO: test invalid identifiers
Expand Down

0 comments on commit a0332db

Please sign in to comment.