From 68ff0fd216f957e5ede4f217cd3a09032b4fb23a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johnny=20Marie=CC=81thoz?= Date: Wed, 19 Jan 2022 16:08:26 +0100 Subject: [PATCH] files: fix files permissions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Adds files restriction for documents. * Adds files restriction for deposits. * Adds files restriction for organisations. * Adds files restriction for collections. * Fixes document restrictions. * Closes #746. Co-Authored-by: Johnny MarieĢthoz --- sonar/config.py | 8 ++ sonar/modules/collections/permissions.py | 61 +++++++- sonar/modules/deposits/permissions.py | 74 +++++++++- sonar/modules/documents/api.py | 23 ++- sonar/modules/documents/permissions.py | 113 ++++++++++++--- sonar/modules/documents/utils.py | 13 +- sonar/modules/documents/views.py | 29 +--- sonar/modules/organisations/permissions.py | 73 +++++++++- sonar/modules/permissions.py | 66 +++++++-- sonar/modules/users/permissions.py | 10 +- .../test_collections_files_permissions.py | 84 +++++++++++ .../test_collections_files_rest.py | 123 ++++++++++++++++ .../test_deposits_files_permissions.py | 84 +++++++++++ .../api/deposits/test_deposits_files_rest.py | 123 ++++++++++++++++ .../test_documents_files_permissions.py | 122 ++++++++++++++++ .../documents/test_documents_files_rest.py | 135 ++++++++++++++++++ .../documents/test_documents_permissions.py | 19 ++- .../test_organisations_files_permissions.py | 84 +++++++++++ .../test_organisations_files_rest.py | 123 ++++++++++++++++ tests/conftest.py | 26 ++++ tests/ui/test_permissions.py | 8 +- 21 files changed, 1311 insertions(+), 90 deletions(-) create mode 100644 tests/api/collections/test_collections_files_permissions.py create mode 100644 tests/api/collections/test_collections_files_rest.py create mode 100644 tests/api/deposits/test_deposits_files_permissions.py create mode 100644 tests/api/deposits/test_deposits_files_rest.py create mode 100644 tests/api/documents/test_documents_files_permissions.py create mode 100644 tests/api/documents/test_documents_files_rest.py create mode 100644 tests/api/organisations/test_organisations_files_permissions.py create mode 100644 tests/api/organisations/test_organisations_files_rest.py diff --git a/sonar/config.py b/sonar/config.py index af23fdeee..2c20c75b2 100644 --- a/sonar/config.py +++ b/sonar/config.py @@ -801,6 +801,14 @@ def _(x): FILES_REST_PERMISSION_FACTORY = \ 'sonar.modules.permissions.files_permission_factory' +SONAR_APP_FILES_REST_PERMISSION = { + 'doc': 'sonar.modules.documents.permissions:DocumentFilesPermission', + 'org': 'sonar.modules.organisations.permissions:OrganisationFilesPermission', + 'coll': 'sonar.modules.collections.permissions:CollectionFilesPermission', + 'depo': 'sonar.modules.deposits.permissions:DepositFilesPermission' +} +"""FilesPermission class to make the invenio files permission more flexible""" + # Database # ========================= DB_VERSIONING = False diff --git a/sonar/modules/collections/permissions.py b/sonar/modules/collections/permissions.py index 7398aa811..48612a3f4 100644 --- a/sonar/modules/collections/permissions.py +++ b/sonar/modules/collections/permissions.py @@ -17,10 +17,12 @@ """Record permissions.""" +from flask import request + from sonar.modules.documents.api import DocumentSearch from sonar.modules.organisations.api import current_organisation from sonar.modules.permissions import RecordPermission as BaseRecordPermission - +from sonar.modules.permissions import FilesPermission as BaseFilesPermission from .api import Record @@ -104,3 +106,60 @@ def delete(cls, user, record): return False return cls.read(user, record) + +class FilesPermission(BaseFilesPermission): + """Collection files permissions. + + Follows the same rules than the corresponding collection except for read + which is always accessible. + """ + + def __init__(self, record, func, user): + """Constructor.""" + super().__init__(record, func, user) + + @classmethod + def get_collection(cls): + """Get the collection record from the URL path.""" + pid_value_args = request.view_args.get('pid_value') + record = None + if pid_value_args: + (pid, record) = pid_value_args.data + return Record.get_record_by_pid(record['pid']) + + @classmethod + def read(cls, user, record): + """Read permission check. + + :param user: Current user record. + :param record: Record to check. + :returns: True is action can be done. + """ + # allowed for anyone + return True + + @classmethod + def update(cls, user, record): + """Update permission check. + + Mainly the same behavior than the corresponding collection record. + + :param user: Current user record. + :param record: Record to check. + :returns: True is action can be done. + """ + # Superuser is allowed. + if user and user.is_superuser: + return True + collection = cls.get_collection() + return collection and RecordPermission.update(user, collection) + + @classmethod + def delete(cls, user, record): + """Delete permission check. + + :param user: Current user record. + :param record: Record to check. + :returns: True is action can be done. + """ + return cls.update(user, record) diff --git a/sonar/modules/deposits/permissions.py b/sonar/modules/deposits/permissions.py index 28ce3a2e4..e55c7924d 100644 --- a/sonar/modules/deposits/permissions.py +++ b/sonar/modules/deposits/permissions.py @@ -16,10 +16,11 @@ # along with this program. If not, see . """Permissions for deposits.""" +from flask import request from sonar.modules.deposits.api import DepositRecord from sonar.modules.organisations.api import current_organisation -from sonar.modules.permissions import RecordPermission +from sonar.modules.permissions import FilesPermission, RecordPermission class DepositPermission(RecordPermission): @@ -30,7 +31,7 @@ def list(cls, user, record=None): """List permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ # At least for submitters logged users. @@ -44,7 +45,7 @@ def create(cls, user, record=None): """Create permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ # No logged user. @@ -58,7 +59,7 @@ def read(cls, user, record): """Read permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ # At least for submitters logged users. @@ -103,7 +104,7 @@ def update(cls, user, record): """Update permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ # Same rules as read. @@ -114,7 +115,7 @@ def delete(cls, user, record): """Delete permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ # Cannot delete a validated deposit. @@ -123,3 +124,64 @@ def delete(cls, user, record): # Same rules as read. return cls.read(user, record) + +class DepositFilesPermission(FilesPermission): + """Deposits files permissions. + + Follows the same rules than the corresponding deposit. + """ + + def __init__(self, record, func, user): + """Constructor.""" + super().__init__(record, func, user) + + @classmethod + def get_deposit(cls): + """Get the document from the URL path.""" + pid_value_args = request.view_args.get('pid_value') + record = None + if pid_value_args: + (pid, record) = pid_value_args.data + return DepositRecord.get_record_by_pid(record['pid']) + + @classmethod + def read(cls, user, record): + """Read permission check. + + :param user: Current user record. + :param record: Record to check. + :returns: True is action can be done. + """ + # Superuser is allowed. + if user and user.is_superuser: + return True + deposit = cls.get_deposit() + return deposit and DepositPermission.read(user, deposit) + + @classmethod + def update(cls, user, record): + """Update permission check. + + :param user: Current user record. + :param record: Record to check. + :returns: True is action can be done. + """ + # Superuser is allowed. + if user and user.is_superuser: + return True + deposit = cls.get_deposit() + return deposit and DepositPermission.update(user, deposit) + + @classmethod + def delete(cls, user, record): + """Delete permission check. + + :param user: Current user record. + :param record: Record to check. + :returns: True is action can be done. + """ + # Superuser is allowed. + if user and user.is_superuser: + return True + deposit = cls.get_deposit() + return deposit and DepositPermission.delete(user, deposit) diff --git a/sonar/modules/documents/api.py b/sonar/modules/documents/api.py index 77bfed76a..d23724d2f 100644 --- a/sonar/modules/documents/api.py +++ b/sonar/modules/documents/api.py @@ -28,7 +28,7 @@ from sonar.modules.documents.minters import id_minter from sonar.modules.pdf_extractor.utils import extract_text_from_content from sonar.modules.utils import change_filename_extension, \ - create_thumbnail_from_file + create_thumbnail_from_file, get_current_ip, is_ip_in_list from ..api import SonarIndexer, SonarRecord, SonarSearch from ..ark.api import current_ark @@ -359,6 +359,27 @@ def is_open_access(self): return True + @property + def is_masked(self): + """Check if record is masked. + + :returns: True if record is masked + :rtype: boolean + """ + if not self.get('masked'): + return False + + if self['masked'] == 'masked_for_all': + return True + + if self['masked'] == 'masked_for_external_ips' and self.get( + 'organisation') and not is_ip_in_list( + get_current_ip(), self['organisation'][0].get( + 'allowedIps', '').split('\n')): + return True + + return False + def get_ark_resolver_url(self): """Get the ark resolver url. diff --git a/sonar/modules/documents/permissions.py b/sonar/modules/documents/permissions.py index 6d2c6019b..cc6dd9ba3 100644 --- a/sonar/modules/documents/permissions.py +++ b/sonar/modules/documents/permissions.py @@ -18,10 +18,13 @@ """Permissions for documents.""" from flask import request +from invenio_files_rest.models import Bucket from sonar.modules.documents.api import DocumentRecord from sonar.modules.organisations.api import current_organisation -from sonar.modules.permissions import RecordPermission +from sonar.modules.permissions import FilesPermission, RecordPermission + +from .utils import get_file_restriction, get_organisations class DocumentPermission(RecordPermission): @@ -32,7 +35,7 @@ def list(cls, user, record=None): """List permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ view = request.args.get('view') @@ -53,7 +56,7 @@ def create(cls, user, record=None): """Create permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ # Only for moderators users @@ -64,42 +67,42 @@ def read(cls, user, record): """Read permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ - # Only for moderator users. - if not user or not user.is_moderator: - return False - # Superuser is allowed. - if user.is_superuser: + if user and user.is_superuser: return True document = DocumentRecord.get_record_by_pid(record['pid']) document = document.replace_refs() - - return document.has_organisation(current_organisation['pid']) + # Moderator can read their own documents. + if user and user.is_moderator: + if document.has_organisation(current_organisation['pid']): + return True + return not document.is_masked @classmethod def update(cls, user, record): """Update permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ - # Same rules as read - can_read = cls.read(user, record) - - if not can_read: + if not user or not user.is_moderator: return False - if user.is_admin: + if user.is_superuser: return True document = DocumentRecord.get_record_by_pid(record['pid']) document = document.replace_refs() + # Moderator can update their own documents. + if not document.has_organisation(current_organisation['pid']): + return False + user = user.replace_refs() return document.has_subdivision(user.get('subdivision', {}).get('pid')) @@ -109,7 +112,7 @@ def delete(cls, user, record): """Delete permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ # Delete is only for admins. @@ -118,3 +121,77 @@ def delete(cls, user, record): # Same rules as update return cls.update(user, record) + +class DocumentFilesPermission(FilesPermission): + """Documents files permissions. + + Write operations are limited to admin users, read depends if the + corresponding document is masked or if the file is restricted. + """ + + def __init__(self, record, func, user): + """Constructor.""" + super().__init__(record, func, user) + + @classmethod + def get_document(cls): + """Get the document from the URL path.""" + pid_value_args = request.view_args.get('pid_value') + record = None + if pid_value_args: + (pid, record) = pid_value_args.data + return DocumentRecord.get_record_by_pid(record['pid']) + + @classmethod + def read(cls, user, record): + """Read permission check. + + :param user: current user record. + :param record: Record to check. + :returns: True is action can be done. + """ + # Superuser is allowed. + if user and user.is_superuser: + return True + document = cls.get_document() + if document and not DocumentPermission.read(user, document): + return False + + # read the bucket metadata + # TODO: filter the list of files based on embargo + if isinstance(record, Bucket): + return True + file_type = document.files[record.key]['type'] + if file_type == 'fulltext' and (not user or not user.is_admin): + return False + file_restriction = get_file_restriction( + document.files[record.key], + get_organisations(document), + True + ) + return not file_restriction.get('restricted', True) + + @classmethod + def update(cls, user, record): + """Update permission check. + + :param user: Current user record. + :param record: Record to check. + :returns: True is action can be done. + """ + if user and user.is_superuser: + return True + document = cls.get_document() + if document: + return DocumentPermission.update(user, document) + return False + + @classmethod + def delete(cls, user, record): + """Delete permission check. + + :param user: Current user record. + :param record: Record to check. + :returns: True is action can be done. + """ + return cls.update(user, record) diff --git a/sonar/modules/documents/utils.py b/sonar/modules/documents/utils.py index 5d3cf2172..9a0f67f6d 100644 --- a/sonar/modules/documents/utils.py +++ b/sonar/modules/documents/utils.py @@ -109,14 +109,14 @@ def get_file_links(file, record): return links -def get_file_restriction(file, organisations): +def get_file_restriction(file, organisations, for_permission=True): """Check if current file can be displayed. :param file: File dict. :param organisations: List of organisations. + :param for_permission: True if it is used to compute permissions. :returns: Object containing result as boolean and possibly embargo date. """ - def is_allowed_by_scope(): """Check if file is fully restricted or only outside organisation. @@ -149,21 +149,20 @@ def is_allowed_by_scope(): not_restricted = {'restricted': False, 'date': None} - # We are in admin, no restrictions are applied. - if not request.args.get('view') and not request.view_args.get( - 'view') and request.url_rule.rule != '/oai2d': + # We are in admin, no restrictions are applied except for permissions. + if not for_permission and not request.args.get('view') and\ + not request.view_args.get('view') and\ + request.url_rule.rule != '/oai2d': return not_restricted # No specific access or specific access is open access if not file.get('access') or file['access'] == 'coar:c_abf2': return not_restricted - # Access is embargoed if file['access'] == 'coar:c_f1cf': # No embargo date if not file.get('embargo_date'): return not_restricted - try: embargo_date = datetime.strptime(file['embargo_date'], '%Y-%m-%d') except Exception: diff --git a/sonar/modules/documents/views.py b/sonar/modules/documents/views.py index 67aba9b60..a6e690cb8 100644 --- a/sonar/modules/documents/views.py +++ b/sonar/modules/documents/views.py @@ -30,8 +30,7 @@ from sonar.modules.documents.utils import has_external_urls_for_files, \ populate_files_properties from sonar.modules.utils import format_date, \ - get_bibliographic_code_from_language, get_current_ip, get_language_value, \ - is_ip_in_list + get_bibliographic_code_from_language, get_language_value from .utils import publication_statement_text @@ -71,28 +70,6 @@ def detail(pid, record, template=None, **kwargs): :param \*\*kwargs: Additional view arguments based on URL rule. :returns: The rendered template. """ - - def is_masked(record): - """Check if record is masked. - - :param record: Record object - :returns: True if record is masked - :rtype: boolean - """ - if not record.get('masked'): - return False - - if record['masked'] == 'masked_for_all': - return True - - if record['masked'] == 'masked_for_external_ips' and record.get( - 'organisation') and not is_ip_in_list( - get_current_ip(), record['organisation'][0].get( - 'allowedIps', '').split('\n')): - return True - - return False - # Add restriction, link and thumbnail to files if record.get('_files'): # Check if organisation's record forces to point file to an external @@ -117,8 +94,8 @@ def is_masked(record): record = record.replace_refs() # Record is masked - if is_masked(record): - abort(404) + if record.is_masked: + abort(403) # Send signal when record is viewed record_viewed.send( diff --git a/sonar/modules/organisations/permissions.py b/sonar/modules/organisations/permissions.py index 9edba9ab6..aac873327 100644 --- a/sonar/modules/organisations/permissions.py +++ b/sonar/modules/organisations/permissions.py @@ -16,9 +16,10 @@ # along with this program. If not, see . """Permissions for organisations.""" - +from flask import request from sonar.modules.organisations.api import current_organisation -from sonar.modules.permissions import RecordPermission +from sonar.modules.permissions import FilesPermission, RecordPermission +from .api import OrganisationRecord class OrganisationPermission(RecordPermission): @@ -29,7 +30,7 @@ def list(cls, user, record=None): """List permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ # Only for admin users at least. @@ -43,7 +44,7 @@ def create(cls, user, record=None): """Create permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ # Only superuser can create an organisation. @@ -54,7 +55,7 @@ def read(cls, user, record): """Read permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ # Only for admin users @@ -73,7 +74,7 @@ def update(cls, user, record): """Update permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ # Same rules as read. @@ -84,7 +85,65 @@ def delete(cls, user, record): """Delete permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True if action can be done. """ return bool(user and user.is_superuser) + +class OrganisationFilesPermission(FilesPermission): + """Organisation files permissions. + + Follows the same rules than the corresponding organisation except for read + which is always accessible. + """ + + def __init__(self, record, func, user): + """Constructor.""" + super().__init__(record, func, user) + + @classmethod + def get_organisation(cls): + """Get the organisation from the URL path.""" + pid_value_args = request.view_args.get('pid_value') + record = None + if pid_value_args: + (pid, record) = pid_value_args.data + return OrganisationRecord.get_record_by_pid(record['pid']) + + @classmethod + def read(cls, user, record): + """Read permission check. + + :param user: Current user record. + :param record: Record to check. + :returns: True is action can be done. + """ + # allowed for anyone + return True + + @classmethod + def update(cls, user, record): + """Update permission check. + + Mainly the same behavior than the corresponding organisation record. + + :param user: Current user record. + :param record: Record to check. + :returns: True is action can be done. + """ + # Superuser is allowed. + if user and user.is_superuser: + return True + organisation = cls.get_organisation() + return organisation and \ + OrganisationPermission.update(user, organisation) + + @classmethod + def delete(cls, user, record): + """Delete permission check. + + :param user: Current user record. + :param record: Record to check. + :returns: True is action can be done. + """ + return cls.update(user, record) diff --git a/sonar/modules/permissions.py b/sonar/modules/permissions.py index 749764e76..042c07fb4 100644 --- a/sonar/modules/permissions.py +++ b/sonar/modules/permissions.py @@ -19,11 +19,11 @@ from functools import wraps -from flask import abort, current_app +from flask import abort, current_app, request from flask_login import current_user from flask_principal import ActionNeed, RoleNeed from invenio_access import Permission -from invenio_records_rest.utils import obj_or_import_string +from invenio_base.utils import obj_or_import_string from sonar.modules.users.api import current_user_record @@ -34,6 +34,10 @@ RoleNeed('admin'), RoleNeed('superuser')) +moderator_access_permission = Permission(RoleNeed('moderator'), + RoleNeed('admin'), + RoleNeed('superuser')) + # Allow access without permission check allow_access = type('Allow', (), {'can': lambda self: True})() @@ -135,13 +139,31 @@ def admin_permission_factory(admin_view): return superuser_access_permission -def files_permission_factory(*kwargs): - """Files rest permission factory.""" +def files_permission_factory(obj, action): + """Files rest permission factory. + + admin: all + moderator: write(organisation only) + read(organisation + as anonymous for other) + anonymous: write(none) + read(record masked, restricted: embargo etc, text never) + """ + # Permission is allowed for all actions. if current_app.config.get('SONAR_APP_DISABLE_PERMISSION_CHECKS'): return allow_access - # TODO: Add checks for accessing files and buckets - return allow_access + pid_type = None + if request.view_args.get('pid_value'): + (pid, rec) = request.view_args.get('pid_value').data + pid_type = pid.pid_type + print('===============>', + current_app.config.get( + 'SONAR_APP_FILES_REST_PERMISSION', {}) + ) + files_permission_cls = obj_or_import_string( + current_app.config.get( + 'SONAR_APP_FILES_REST_PERMISSION', {}).get(pid_type, FilesPermission)) + return files_permission_cls.create_permission(obj, action) def wiki_edit_permission(): @@ -210,7 +232,7 @@ def list(cls, user, record=None): """List permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ if not user: @@ -223,7 +245,7 @@ def create(cls, user, record=None): """Create permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ if not user: @@ -236,7 +258,7 @@ def read(cls, user, record): """Read permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ if not user: @@ -249,7 +271,7 @@ def update(cls, user, record): """Update permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ if not user: @@ -262,10 +284,32 @@ def delete(cls, user, record): """Delete permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ if not user: return False return has_superuser_access() + + +class FilesPermission(RecordPermission): + """Record files permissions for CRUD operations.""" + + list_actions = [] + create_actions = [] + read_actions = [ + 'bucket-read', 'bucket-read-versions', + 'bucket-listmultiparts', + 'object-read', 'multipart-read', + 'object-read-version' + ] + update_actions = [ + 'location-update', + 'bucket-update', + ] + delete_actions = [ + 'object-delete', + 'object-delete-version', + 'multipart-delete' + ] diff --git a/sonar/modules/users/permissions.py b/sonar/modules/users/permissions.py index 7a628d35c..7b6cb5012 100644 --- a/sonar/modules/users/permissions.py +++ b/sonar/modules/users/permissions.py @@ -31,7 +31,7 @@ def list(cls, user, record=None): """List permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ if not user: @@ -44,7 +44,7 @@ def create(cls, user, record=None): """Create permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ if not user: @@ -57,7 +57,7 @@ def read(cls, user, record): """Read permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ if not user: @@ -92,7 +92,7 @@ def update(cls, user, record): """Update permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ # Same rules as read permission. @@ -103,7 +103,7 @@ def delete(cls, user, record): """Delete permission check. :param user: Current user record. - :param recor: Record to check. + :param record: Record to check. :returns: True is action can be done. """ # At least for admin logged users. diff --git a/tests/api/collections/test_collections_files_permissions.py b/tests/api/collections/test_collections_files_permissions.py new file mode 100644 index 000000000..d63c5e869 --- /dev/null +++ b/tests/api/collections/test_collections_files_permissions.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Swiss Open Access Repository +# Copyright (C) 2022 RERO +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, version 3 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +"""Test collections files permissions.""" + + +from flask import url_for +from flask_security import url_for_security +from invenio_accounts.testutils import login_user_via_session + + +def test_update_delete(client, superuser, admin, moderator, + submitter, user, collection, pdf_file): + """Test permissions for uploading and deleting files.""" + file_name = 'test.pdf' + users = [superuser, admin, moderator, submitter, user, None] + + # upload the file + url_file_content = url_for( + 'invenio_records_files.coll_object_api', + pid_value=collection.get('pid'), key=file_name) + for u, status in zip(users, [200, 200, 404, 404, 404, 404]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.put(url_file_content, input_stream=open(pdf_file, 'rb')) + assert res.status_code == status + if status == 200: + # the delete return status is no content + status = 204 + res = client.delete(url_file_content) + assert res.status_code == status + + +def test_read_metadata(client, superuser, admin, moderator, + submitter, user, collection_with_file): + """Test read files permissions.""" + + users = [superuser, admin, moderator, submitter, user, None] + url_files = url_for( + 'invenio_records_files.coll_bucket_api', + pid_value=collection_with_file.get('pid')) + for u, status in zip(users, [200, 200, 200, 200, 200, 200]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.get(url_files) + assert res.status_code == status + + +def test_read_content(client, superuser, admin, moderator, + submitter, user, collection_with_file): + """Test read collections permissions.""" + + file_name = 'test1.pdf' + users = [ + superuser, admin, moderator, submitter, user, None] + url_file_content = url_for( + 'invenio_records_files.coll_object_api', + pid_value=collection_with_file.get('pid'), + key=file_name) + for u, status in zip(users, [200, 200, 200, 200, 200, 200]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.get(url_file_content) + assert res.status_code == status diff --git a/tests/api/collections/test_collections_files_rest.py b/tests/api/collections/test_collections_files_rest.py new file mode 100644 index 000000000..d8c8277b1 --- /dev/null +++ b/tests/api/collections/test_collections_files_rest.py @@ -0,0 +1,123 @@ +# -*- coding: utf-8 -*- +# +# Swiss Open Access Repository +# Copyright (C) 2022 RERO +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, version 3 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +"""Test REST endpoint for collections.""" + + +from flask import url_for + + +def test_get_content(app, client, collection_with_file): + """Test get existing file content.""" + app.config.update(SONAR_APP_DISABLE_PERMISSION_CHECKS=True) + + file_name = 'test1.pdf' + + # get the pdf file + url_file_content = url_for( + 'invenio_records_files.coll_object_api', + pid_value=collection_with_file.get('pid'), key=file_name) + res = client.get(url_file_content) + assert res.status_code == 200 + assert res.content_type == 'application/octet-stream' + assert res.content_length > 0 + + +def test_get_metadata(app, client, collection_with_file): + """Test get existing file metadata.""" + app.config.update(SONAR_APP_DISABLE_PERMISSION_CHECKS=True) + + # get all files metadata of a given collection + url_files = url_for( + 'invenio_records_files.coll_bucket_api', + pid_value=collection_with_file.get('pid')) + res = client.get(url_files) + assert res.status_code == 200 + file_keys = ['test1.pdf'] + assert set(file_keys) == set( + [f.get('key') for f in res.json.get('contents')]) + + # get a specific file metadata of a given collection + for f_key in file_keys: + url_file = url_for( + 'invenio_records_files.coll_bucket_api', + pid_value=collection_with_file.get('pid'), key=f_key) + res = client.get(url_file) + assert res.status_code == 200 + + +def test_put_delete(app, client, collection, pdf_file): + """Test create and delete a file.""" + app.config.update(SONAR_APP_DISABLE_PERMISSION_CHECKS=True) + file_name = 'test.pdf' + + # upload the file + url_file_content = url_for( + 'invenio_records_files.coll_object_api', pid_value=collection.get('pid'), key=file_name) + res = client.put(url_file_content, input_stream=open(pdf_file, 'rb')) + assert res.status_code == 200 + + # get the version id + url_file = url_for( + 'invenio_records_files.coll_bucket_api', + pid_value=collection.get('pid'), key=file_name) + res = client.get(url_file) + content = res.json.get('contents') + + assert len(content) == 1 + content = content.pop() + version_id = content.get('version_id') + + # upload a second version + url_file_content = url_for( + 'invenio_records_files.coll_object_api', pid_value=collection.get('pid'), key=file_name) + res = client.put(url_file_content, input_stream=open(pdf_file, 'rb')) + assert res.status_code == 200 + + # get the new version id + url_file = url_for( + 'invenio_records_files.coll_bucket_api', + pid_value=collection.get('pid'), key=file_name) + res = client.get(url_file) + content = res.json.get('contents') + + assert len(content) == 1 + content = content.pop() + assert version_id != content.get('version_id') + + # delete the file + url_delete_file_content = url_for( + 'invenio_records_files.coll_object_api', + pid_value=collection.get('pid'), key=file_name) + res = client.delete(url_delete_file_content) + assert res.status_code == 204 + + # the file does not exists anymore + res = client.get(url_file_content) + assert res.status_code == 404 + + # the file does not exists anymore + url_file = url_for( + 'invenio_records_files.coll_bucket_api', + pid_value=collection.get('pid'), key=file_name) + res = client.get(url_file) + assert res.status_code == 200 + content = res.json.get('contents') + # fulltext, thumbnail: file has been removed + # TODO: is it the right approach? Do we need to remove files and + # the bucket? + assert len(content) == 0 diff --git a/tests/api/deposits/test_deposits_files_permissions.py b/tests/api/deposits/test_deposits_files_permissions.py new file mode 100644 index 000000000..a65554afd --- /dev/null +++ b/tests/api/deposits/test_deposits_files_permissions.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Swiss Open Access Repository +# Copyright (C) 2022 RERO +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, version 3 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +"""Test deposits files permissions.""" + + +from flask import url_for +from flask_security import url_for_security +from invenio_accounts.testutils import login_user_via_session + + +def test_update_delete(client, superuser, admin, moderator, + submitter, user, deposit, pdf_file): + """Test permissions for uploading and deleting files.""" + file_name = 'test.pdf' + users = [superuser, admin, moderator, submitter, user, None] + + # upload the file + url_file_content = url_for( + 'invenio_records_files.depo_object_api', + pid_value=deposit.get('pid'), key=file_name) + for u, status in zip(users, [200, 200, 200, 404, 404, 404]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.put(url_file_content, input_stream=open(pdf_file, 'rb')) + assert res.status_code == status + if status == 200: + # the delete return status is no content + status = 204 + res = client.delete(url_file_content) + assert res.status_code == status + + +def test_read_metadata(client, superuser, admin, moderator, + submitter, user, deposit): + """Test read files permissions.""" + + users = [superuser, admin, moderator, submitter, user, None] + url_files = url_for( + 'invenio_records_files.depo_bucket_api', + pid_value=deposit.get('pid')) + for u, status in zip(users, [200, 200, 200, 404, 404, 404]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.get(url_files) + assert res.status_code == status + + +def test_read_content(client, superuser, admin, moderator, + submitter, user, deposit): + """Test read deposits permissions.""" + + file_name = 'main.pdf' + users = [ + superuser, admin, moderator, submitter, user, None] + url_file_content = url_for( + 'invenio_records_files.depo_object_api', + pid_value=deposit.get('pid'), + key=file_name) + for u, status in zip(users, [200, 200, 200, 404, 404, 404]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.get(url_file_content) + assert res.status_code == status diff --git a/tests/api/deposits/test_deposits_files_rest.py b/tests/api/deposits/test_deposits_files_rest.py new file mode 100644 index 000000000..cc72f6feb --- /dev/null +++ b/tests/api/deposits/test_deposits_files_rest.py @@ -0,0 +1,123 @@ +# -*- coding: utf-8 -*- +# +# Swiss Open Access Repository +# Copyright (C) 2022 RERO +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, version 3 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +"""Test REST endpoint for deposits.""" + + +from flask import url_for + + +def test_get_content(app, client, deposit): + """Test get existing file content.""" + app.config.update(SONAR_APP_DISABLE_PERMISSION_CHECKS=True) + + file_name = 'main.pdf' + + # get the pdf file + url_file_content = url_for( + 'invenio_records_files.depo_object_api', + pid_value=deposit.get('pid'), key=file_name) + res = client.get(url_file_content) + assert res.status_code == 200 + assert res.content_type == 'application/octet-stream' + assert res.content_length > 0 + + +def test_get_metadata(app, client, deposit): + """Test get existing file metadata.""" + app.config.update(SONAR_APP_DISABLE_PERMISSION_CHECKS=True) + + # get all files metadata of a given deposit + url_files = url_for( + 'invenio_records_files.depo_bucket_api', + pid_value=deposit.get('pid')) + res = client.get(url_files) + assert res.status_code == 200 + file_keys = ['main.pdf', 'additional.pdf'] + assert set(file_keys) == set( + [f.get('key') for f in res.json.get('contents')]) + + # get a specific file metadata of a given deposit + for f_key in file_keys: + url_file = url_for( + 'invenio_records_files.depo_bucket_api', + pid_value=deposit.get('pid'), key=f_key) + res = client.get(url_file) + assert res.status_code == 200 + + +def test_put_delete(app, client, deposit, pdf_file): + """Test create and delete a file.""" + app.config.update(SONAR_APP_DISABLE_PERMISSION_CHECKS=True) + file_name = 'test.pdf' + + # upload the file + url_file_content = url_for( + 'invenio_records_files.depo_object_api', pid_value=deposit.get('pid'), key=file_name) + res = client.put(url_file_content, input_stream=open(pdf_file, 'rb')) + assert res.status_code == 200 + + # get the version id + url_file = url_for( + 'invenio_records_files.depo_bucket_api', + pid_value=deposit.get('pid'), key=file_name) + res = client.get(url_file) + content = res.json.get('contents') + + assert len(content) == 3 + content = content.pop() + version_id = content.get('version_id') + + # upload a second version + url_file_content = url_for( + 'invenio_records_files.depo_object_api', pid_value=deposit.get('pid'), key=file_name) + res = client.put(url_file_content, input_stream=open(pdf_file, 'rb')) + assert res.status_code == 200 + + # get the new version id + url_file = url_for( + 'invenio_records_files.depo_bucket_api', + pid_value=deposit.get('pid'), key=file_name) + res = client.get(url_file) + content = res.json.get('contents') + + assert len(content) == 3 + content = content.pop() + assert version_id != content.get('version_id') + + # delete the file + url_delete_file_content = url_for( + 'invenio_records_files.depo_object_api', + pid_value=deposit.get('pid'), key=file_name) + res = client.delete(url_delete_file_content) + assert res.status_code == 204 + + # the file does not exists anymore + res = client.get(url_file_content) + assert res.status_code == 404 + + # the file does not exists anymore + url_file = url_for( + 'invenio_records_files.depo_bucket_api', + pid_value=deposit.get('pid'), key=file_name) + res = client.get(url_file) + assert res.status_code == 200 + content = res.json.get('contents') + # fulltext, thumbnail: file has been removed + # TODO: is it the right approach? Do we need to remove files and + # the bucket? + assert len(content) == 2 diff --git a/tests/api/documents/test_documents_files_permissions.py b/tests/api/documents/test_documents_files_permissions.py new file mode 100644 index 000000000..8137ebf15 --- /dev/null +++ b/tests/api/documents/test_documents_files_permissions.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- +# +# Swiss Open Access Repository +# Copyright (C) 2022 RERO +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, version 3 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +"""Test documents files permissions.""" + + +from flask import url_for +from flask_security import url_for_security +from invenio_accounts.testutils import login_user_via_session + + +def test_update_delete(client, superuser, admin, moderator, + submitter, user, document, pdf_file): + """Test permissions for uploading and deleting files.""" + file_name = 'test.pdf' + users = [superuser, admin, moderator, submitter, user, None] + + # upload the file + url_file_content = url_for( + 'invenio_records_files.doc_object_api', pid_value=document.get('pid'), key=file_name) + for u, status in zip(users, [200, 200, 200, 404, 404, 404]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.put(url_file_content, input_stream=open(pdf_file, 'rb')) + assert res.status_code == status + if status == 200: + # the delete return status is no content + status = 204 + res = client.delete(url_file_content) + assert res.status_code == status + + + +def test_read_metadata(client, superuser, admin, moderator, + submitter, user, document_with_file): + """Test read files permissions.""" + + users = [superuser, admin, moderator, submitter, user, None] + url_files = url_for( + 'invenio_records_files.doc_bucket_api', + pid_value=document_with_file.get('pid')) + for u, status in zip(users, [200, 200, 200, 200, 200, 200]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.get(url_files) + assert res.status_code == status + + # Masked document + document_with_file['masked'] = 'masked_for_all' + document_with_file.commit() + for u, status in zip(users, [200, 200, 200, 404, 404, 404]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.get(url_files) + assert res.status_code == status + +def test_read_content(client, superuser, admin, moderator, + submitter, user, user_without_org, document_with_file): + """Test read documents permissions.""" + + file_name = 'test1.pdf' + users = [ + superuser, admin, moderator, submitter, user, user_without_org, None] + url_file_content = url_for( + 'invenio_records_files.doc_object_api', + pid_value=document_with_file.get('pid'), + key=file_name) + open_access_code = "coar:c_abf2" + document_with_file.files[file_name]['access'] = open_access_code + document_with_file.commit() + for u, status in zip(users, [200, 200, 200, 200, 200, 200, 200]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.get(url_file_content) + assert res.status_code == status + # Masked document + document_with_file['masked'] = 'masked_for_all' + document_with_file.commit() + for u, status in zip(users, [200, 200, 200, 404, 404, 404, 404]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.get(url_file_content) + assert res.status_code == status + + del document_with_file['masked'] + + # restricted files + restricted_code = "coar:c_16ec" + document_with_file.files[file_name]['access'] = restricted_code + document_with_file.files[file_name]['restricted_outside_organisation'] = True + document_with_file.commit() + for u, status in zip(users, [200, 200, 200, 200, 200, 404, 404]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.get(url_file_content) + assert res.status_code == status diff --git a/tests/api/documents/test_documents_files_rest.py b/tests/api/documents/test_documents_files_rest.py new file mode 100644 index 000000000..0ecbf4423 --- /dev/null +++ b/tests/api/documents/test_documents_files_rest.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- +# +# Swiss Open Access Repository +# Copyright (C) 2022 RERO +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, version 3 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +"""Test REST endpoint for documents.""" + + +from flask import url_for + + +def test_get_content(app, client, document_with_file): + """Test get existing file content.""" + app.config.update(SONAR_APP_DISABLE_PERMISSION_CHECKS=True) + + file_name = 'test1.pdf' + fulltext_file_name = 'test1-pdf.txt' + thumbnail_file_name = 'test1-pdf.jpg' + + # get the pdf file + url_file_content = url_for('invenio_records_files.doc_object_api', pid_value=document_with_file.get('pid'), key=file_name) + res = client.get(url_file_content) + assert res.status_code == 200 + assert res.content_type == 'application/octet-stream' + assert res.content_length > 0 + + url_file_content = url_for('invenio_records_files.doc_object_api', pid_value=document_with_file.get('pid'), key=fulltext_file_name) + res = client.get(url_file_content) + assert res.status_code == 200 + assert res.content_type == 'text/plain; charset=utf-8' + assert res.content_length > 0 + + url_file_content = url_for('invenio_records_files.doc_object_api', pid_value=document_with_file.get('pid'), key=thumbnail_file_name) + res = client.get(url_file_content) + assert res.status_code == 200 + assert res.content_type == 'image/jpeg' + assert res.content_length > 0 + +def test_get_metadata(app, client, document_with_file): + """Test get existing file metadata.""" + app.config.update(SONAR_APP_DISABLE_PERMISSION_CHECKS=True) + + # get all files metadata of a given document + url_files = url_for( + 'invenio_records_files.doc_bucket_api', + pid_value=document_with_file.get('pid')) + res = client.get(url_files) + assert res.status_code == 200 + file_keys = ['test1.pdf', 'test1-pdf.txt', 'test1-pdf.jpg'] + assert set(file_keys) == set( + [f.get('key') for f in res.json.get('contents')]) + + # get a specific file metadata of a given document + for f_key in file_keys: + url_file = url_for( + 'invenio_records_files.doc_bucket_api', + pid_value=document_with_file.get('pid'), key=f_key) + res = client.get(url_file) + assert res.status_code == 200 + + +def test_put_delete(app, client, document, pdf_file): + """Test create and delete a file.""" + app.config.update(SONAR_APP_DISABLE_PERMISSION_CHECKS=True) + file_name = 'test.pdf' + + # upload the file + url_file_content = url_for( + 'invenio_records_files.doc_object_api', pid_value=document.get('pid'), key=file_name) + res = client.put(url_file_content, input_stream=open(pdf_file, 'rb')) + assert res.status_code == 200 + + # get the version id + url_file = url_for( + 'invenio_records_files.doc_bucket_api', + pid_value=document.get('pid'), key=file_name) + res = client.get(url_file) + content = res.json.get('contents') + + # file, fulltext, thumbnail + assert len(content) == 3 + content = content.pop() + version_id = content.get('version_id') + + # upload a second version + url_file_content = url_for( + 'invenio_records_files.doc_object_api', pid_value=document.get('pid'), key=file_name) + res = client.put(url_file_content, input_stream=open(pdf_file, 'rb')) + assert res.status_code == 200 + + # get the new version id + url_file = url_for( + 'invenio_records_files.doc_bucket_api', + pid_value=document.get('pid'), key=file_name) + res = client.get(url_file) + content = res.json.get('contents') + + # file, fulltext, thumbnail + assert len(content) == 3 + content = content.pop() + assert version_id != content.get('version_id') + + # delete the file + url_delete_file_content = url_for( + 'invenio_records_files.doc_object_api', pid_value=document.get('pid'), key=file_name) + res = client.delete(url_delete_file_content) + assert res.status_code == 204 + + # the file does not exists anymore + res = client.get(url_file_content) + assert res.status_code == 404 + + # the file does not exists anymore + url_file = url_for( + 'invenio_records_files.doc_bucket_api', + pid_value=document.get('pid'), key=file_name) + res = client.get(url_file) + assert res.status_code == 200 + content = res.json.get('contents') + # fulltext, thumbnail: file has been removed + # TODO: is it the right approach? Do we need to remove files and + # the bucket? + assert len(content) == 2 diff --git a/tests/api/documents/test_documents_permissions.py b/tests/api/documents/test_documents_permissions.py index 61162b4cc..af3b7ab75 100644 --- a/tests/api/documents/test_documents_permissions.py +++ b/tests/api/documents/test_documents_permissions.py @@ -19,6 +19,7 @@ import json +import mock from flask import url_for from invenio_accounts.testutils import login_user_via_session from invenio_pidstore.models import PersistentIdentifier, PIDStatus @@ -147,19 +148,29 @@ def test_read(client, document, make_user, superuser, admin, moderator, # Not logged res = client.get( url_for('invenio_records_rest.doc_item', pid_value=document['pid'])) - assert res.status_code == 401 + assert res.status_code == 200 + + # masked document + magic_mock = mock.MagicMock(return_value=True) + with mock.patch( + 'sonar.modules.documents.api.DocumentRecord.is_masked', + magic_mock + ): + res = client.get( + url_for('invenio_records_rest.doc_item', pid_value=document['pid'])) + assert res.status_code == 401 # Logged as user login_user_via_session(client, email=user['email']) res = client.get( url_for('invenio_records_rest.doc_item', pid_value=document['pid'])) - assert res.status_code == 403 + assert res.status_code == 200 # Logged as submitter login_user_via_session(client, email=submitter['email']) res = client.get( url_for('invenio_records_rest.doc_item', pid_value=document['pid'])) - assert res.status_code == 403 + assert res.status_code == 200 # Logged as moderator login_user_via_session(client, email=moderator['email']) @@ -194,7 +205,7 @@ def test_read(client, document, make_user, superuser, admin, moderator, login_user_via_session(client, email=other_admin['email']) res = client.get( url_for('invenio_records_rest.doc_item', pid_value=document['pid'])) - assert res.status_code == 403 + assert res.status_code == 200 # Logged as superuser login_user_via_session(client, email=superuser['email']) diff --git a/tests/api/organisations/test_organisations_files_permissions.py b/tests/api/organisations/test_organisations_files_permissions.py new file mode 100644 index 000000000..a81e43c83 --- /dev/null +++ b/tests/api/organisations/test_organisations_files_permissions.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Swiss Open Access Repository +# Copyright (C) 2022 RERO +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, version 3 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +"""Test organisations files permissions.""" + + +from flask import url_for +from flask_security import url_for_security +from invenio_accounts.testutils import login_user_via_session + + +def test_update_delete(client, superuser, admin, moderator, + submitter, user, organisation, pdf_file): + """Test permissions for uploading and deleting files.""" + file_name = 'test.pdf' + users = [superuser, admin, moderator, submitter, user, None] + + # upload the file + url_file_content = url_for( + 'invenio_records_files.org_object_api', + pid_value=organisation.get('pid'), key=file_name) + for u, status in zip(users, [200, 200, 404, 404, 404, 404]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.put(url_file_content, input_stream=open(pdf_file, 'rb')) + assert res.status_code == status + if status == 200: + # the delete return status is no content + status = 204 + res = client.delete(url_file_content) + assert res.status_code == status + + +def test_read_metadata(client, superuser, admin, moderator, + submitter, user, organisation_with_file): + """Test read files permissions.""" + + users = [superuser, admin, moderator, submitter, user, None] + url_files = url_for( + 'invenio_records_files.org_bucket_api', + pid_value=organisation_with_file.get('pid')) + for u, status in zip(users, [200, 200, 200, 200, 200, 200]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.get(url_files) + assert res.status_code == status + + +def test_read_content(client, superuser, admin, moderator, + submitter, user, organisation_with_file): + """Test read organisations permissions.""" + + file_name = 'test1.pdf' + users = [ + superuser, admin, moderator, submitter, user, None] + url_file_content = url_for( + 'invenio_records_files.org_object_api', + pid_value=organisation_with_file.get('pid'), + key=file_name) + for u, status in zip(users, [200, 200, 200, 200, 200, 200]): + if u: + login_user_via_session(client, email=u['email']) + else: + client.get(url_for_security('logout')) + res = client.get(url_file_content) + assert res.status_code == status diff --git a/tests/api/organisations/test_organisations_files_rest.py b/tests/api/organisations/test_organisations_files_rest.py new file mode 100644 index 000000000..8cfc04605 --- /dev/null +++ b/tests/api/organisations/test_organisations_files_rest.py @@ -0,0 +1,123 @@ +# -*- coding: utf-8 -*- +# +# Swiss Open Access Repository +# Copyright (C) 2022 RERO +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, version 3 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +"""Test REST endpoint for organisations.""" + + +from flask import url_for + + +def test_get_content(app, client, organisation_with_file): + """Test get existing file content.""" + app.config.update(SONAR_APP_DISABLE_PERMISSION_CHECKS=True) + + file_name = 'test1.pdf' + + # get the pdf file + url_file_content = url_for( + 'invenio_records_files.org_object_api', + pid_value=organisation_with_file.get('pid'), key=file_name) + res = client.get(url_file_content) + assert res.status_code == 200 + assert res.content_type == 'application/octet-stream' + assert res.content_length > 0 + + +def test_get_metadata(app, client, organisation_with_file): + """Test get existing file metadata.""" + app.config.update(SONAR_APP_DISABLE_PERMISSION_CHECKS=True) + + # get all files metadata of a given organisation + url_files = url_for( + 'invenio_records_files.org_bucket_api', + pid_value=organisation_with_file.get('pid')) + res = client.get(url_files) + assert res.status_code == 200 + file_keys = ['test1.pdf'] + assert set(file_keys) == set( + [f.get('key') for f in res.json.get('contents')]) + + # get a specific file metadata of a given organisation + for f_key in file_keys: + url_file = url_for( + 'invenio_records_files.org_bucket_api', + pid_value=organisation_with_file.get('pid'), key=f_key) + res = client.get(url_file) + assert res.status_code == 200 + + +def test_put_delete(app, client, organisation, pdf_file): + """Test create and delete a file.""" + app.config.update(SONAR_APP_DISABLE_PERMISSION_CHECKS=True) + file_name = 'test.pdf' + + # upload the file + url_file_content = url_for( + 'invenio_records_files.org_object_api', pid_value=organisation.get('pid'), key=file_name) + res = client.put(url_file_content, input_stream=open(pdf_file, 'rb')) + assert res.status_code == 200 + + # get the version id + url_file = url_for( + 'invenio_records_files.org_bucket_api', + pid_value=organisation.get('pid'), key=file_name) + res = client.get(url_file) + content = res.json.get('contents') + + assert len(content) == 1 + content = content.pop() + version_id = content.get('version_id') + + # upload a second version + url_file_content = url_for( + 'invenio_records_files.org_object_api', pid_value=organisation.get('pid'), key=file_name) + res = client.put(url_file_content, input_stream=open(pdf_file, 'rb')) + assert res.status_code == 200 + + # get the new version id + url_file = url_for( + 'invenio_records_files.org_bucket_api', + pid_value=organisation.get('pid'), key=file_name) + res = client.get(url_file) + content = res.json.get('contents') + + assert len(content) == 1 + content = content.pop() + assert version_id != content.get('version_id') + + # delete the file + url_delete_file_content = url_for( + 'invenio_records_files.org_object_api', + pid_value=organisation.get('pid'), key=file_name) + res = client.delete(url_delete_file_content) + assert res.status_code == 204 + + # the file does not exists anymore + res = client.get(url_file_content) + assert res.status_code == 404 + + # the file does not exists anymore + url_file = url_for( + 'invenio_records_files.org_bucket_api', + pid_value=organisation.get('pid'), key=file_name) + res = client.get(url_file) + assert res.status_code == 200 + content = res.json.get('contents') + # fulltext, thumbnail: file has been removed + # TODO: is it the right approach? Do we need to remove files and + # the bucket? + assert len(content) == 0 diff --git a/tests/conftest.py b/tests/conftest.py index becb05a46..8bbe848ba 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -164,6 +164,15 @@ def organisation(make_organisation): """Create an organisation.""" return make_organisation('org') +@pytest.fixture() +def organisation_with_file(organisation, pdf_file): + """Create an organisation with a file attached.""" + with open(pdf_file, 'rb') as file: + organisation.add_file(file.read(), + 'test1.pdf' + ) + organisation.commit() + return organisation @pytest.fixture() def roles(base_app, db): @@ -256,6 +265,12 @@ def user_without_role(app, db): return user +@pytest.fixture() +def user_without_org(make_user): + """Create user without organisation.""" + return make_user('user', None) + + @pytest.fixture() def user(make_user): """Create user.""" @@ -850,6 +865,17 @@ def collection(app, db, es, admin, organisation, collection_json): return collection +@pytest.fixture() +def collection_with_file(collection, pdf_file): + """Create a collection with a file attached.""" + with open(pdf_file, 'rb') as file: + collection.add_file(file.read(), + 'test1.pdf' + ) + collection.commit() + return collection + + @pytest.fixture() def subdivision_json(): """Subdivision JSON.""" diff --git a/tests/ui/test_permissions.py b/tests/ui/test_permissions.py index d43dd5b79..e7b5d04d8 100644 --- a/tests/ui/test_permissions.py +++ b/tests/ui/test_permissions.py @@ -159,14 +159,14 @@ def test_unknown_permission_factory(app, client, superuser, document): assert not record_permission_factory(document, 'unknown').can() -def test_files_permission_factory(app, client, admin): +def test_files_permission_factory(app, client, superuser): """Test files permission factory.""" app.config.update(SONAR_APP_DISABLE_PERMISSION_CHECKS=True) - assert files_permission_factory().can() + assert files_permission_factory(None, 'object-read').can() app.config.update(SONAR_APP_DISABLE_PERMISSION_CHECKS=False) - login_user_via_view(client, email=admin['email'], password='123456') - assert files_permission_factory().can() + login_user_via_view(client, email=superuser['email'], password='123456') + assert files_permission_factory(None, 'object-read').can() def test_admin_permission_factory(app, client, superuser):