Skip to content

Commit

Permalink
queryparser: add RestrictedTerm to exclude allowed phrases by permission
Browse files Browse the repository at this point in the history
  • Loading branch information
kpsherva authored and zzacharo committed Nov 8, 2024
1 parent e297181 commit 4a0d858
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""

from invenio_i18n import gettext as _
from luqum.tree import Phrase, Word
from luqum.visitor import TreeTransformer

from invenio_records_resources.services.errors import QuerystringValidationError
Expand All @@ -33,15 +34,68 @@ def term_name(self):
"""Get the term name."""
return self._term_name

def map_word(self, node):
def map_word(self, node, **kwargs):
"""Modify a word node."""
return self._word_fun(node) if self._word_fun else node

def map_phrase(self, node):
def map_phrase(self, node, **kwargs):
"""Modify a phrase node."""
return self._phrase_fun(node) if self._phrase_fun else node


class RestrictedTerm:
"""Class used to apply specific permissions to search.
ex. "internal_notes.note": RestrictedTerm(system_access_permission)
restricts the internal_notes.note field from being searched by anyone else
than system process
"""

def __init__(self, permission):
"""Constructor."""
self.permission = permission

def allows(self, identity):
"""Check the permission for the whole term."""
return self.permission.allows(identity)


class RestrictedTermValue:
"""Class used to apply specific permissions to search specific words.
ex. "_exists_": RestrictedTermValue(
system_access_permission, word=word_internal_notes
),
will rewrite the searched field based on given permission
"""

def __init__(self, permission, word=None, phrase=None):
"""Constructor."""
self.permission = permission
self._word_fun = word
self._phrase_fun = phrase

def map_word(self, node, context, **kwargs):
"""Modify a word node."""
is_restricted = not self.permission.allows(context["identity"])
if not is_restricted:
return node
if self._word_fun and is_restricted:
return self._word_fun(node)
else:
return Word("")

def map_phrase(self, node, context, **kwargs):
"""Modify a phrase node."""
is_restricted = not self.permission.allows(context["identity"])
if not is_restricted:
return node
if self._phrase_fun and is_restricted:
return self._phrase_fun(node)
else:
return Phrase("")


class SearchFieldTransformer(TreeTransformer):
"""Transform from user-friendly field names to internal field names."""

Expand All @@ -57,10 +111,24 @@ def visit_search_field(self, node, context):
term_name = self._mapping.get(node.name, node.name)
field_value_mapper = None

# TODO if you need another if clause here, please refactor
if isinstance(term_name, FieldValueMapper):
field_value_mapper = term_name
term_name = field_value_mapper.term_name

if isinstance(term_name, RestrictedTermValue):
field_value_mapper = term_name
term_name = node.name
if isinstance(term_name, RestrictedTerm):
allows = term_name.allows(context["identity"])
term_name = node.name
# field_value_mapper is left as None on purpose - if the permission
# allows, we don't map any query, we allow it "as is"
if not allows:
raise QuerystringValidationError(
_("Invalid search field: {field_name}.").format(
field_name=node.name
)
)
# If a allow list exists, the term must be allowed to be queried.
if self._allow_list and not term_name in self._allow_list:
raise QuerystringValidationError(
Expand All @@ -79,9 +147,9 @@ def visit_search_field(self, node, context):
def visit_word(self, node, context):
"""Visit a word term."""
mapper = context.get("field_value_mapper")
yield node if mapper is None else mapper.map_word(node)
yield node if mapper is None else mapper.map_word(node, context=context)

def visit_phrase(self, node, context):
"""Visit a phrase term."""
mapper = context.get("field_value_mapper")
yield node if mapper is None else mapper.map_phrase(node)
yield node if mapper is None else mapper.map_phrase(node, context=context)
91 changes: 89 additions & 2 deletions tests/services/test_service_queryparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@
"""Query parser tests."""

import pytest
from invenio_access.permissions import system_identity
from luqum.tree import Phrase
from flask_principal import ActionNeed
from invenio_access.permissions import Permission, SystemRoleNeed, system_identity
from luqum.tree import Phrase, Word

from invenio_records_resources.services.records.queryparser import (
FieldValueMapper,
QueryParser,
SearchFieldTransformer,
)
from invenio_records_resources.services.records.queryparser.transformer import (
RestrictedTerm,
RestrictedTermValue,
)


@pytest.fixture()
Expand Down Expand Up @@ -194,3 +199,85 @@ def lol(node):
assert p(system_identity).parse(query).to_dict() == {
"query_string": {"query": transformed_query}
}


@pytest.mark.parametrize(
"query,transformed_query",
[
# notice the input and the output of the test is the same.
# Search field transformer does not rewrite the search query, but raises exception handled in:
# https://github.com/inveniosoftware/invenio-records-resources/blob/e297181296eef56bdfb0d1486c3e570fa741a0aa/invenio_records_resources/services/records/queryparser/query.py#L136
# but multimatch will not pick up any results, due to default fields we are allowed to search on
# integration test for this behaviour is in invenio-rdm-records
("internal_notes.note:abc", "internal_notes.note:abc"),
],
)
def test_querystring_restricted_term(query, transformed_query, identity_simple, app):
"""Invalid syntax falls back to multi match query."""

sysadmin_permission = Permission(SystemRoleNeed("system_process"))

def word_internal_notes(node):
"""Rewrite internal notes value."""
if not node.value.startswith("internal_notes"):
return node
return Word("")

p = QueryParser.factory(
mapping={
"internal_notes.note": RestrictedTerm(sysadmin_permission),
"_exists_": RestrictedTermValue(
sysadmin_permission, word=word_internal_notes
),
},
tree_transformer_cls=SearchFieldTransformer,
)

parser = p(system_identity)

assert parser.parse(query).to_dict() == {"query_string": {"query": query}}

parser = p(identity_simple)

assert parser.parse(query).to_dict() == {
"multi_match": {"query": transformed_query}
}


@pytest.mark.parametrize(
"query,transformed_query",
[
("_exists_:internal_notes", "_exists_:"),
("_exists_:metadata", "_exists_:metadata"),
],
)
def test_querystring_restricted_term(query, transformed_query, identity_simple, app):
"""Invalid syntax falls back to multi match query."""

sysadmin_permission = Permission(SystemRoleNeed("system_process"))

def word_internal_notes(node):
"""Rewrite internal notes value."""
if not node.value.startswith("internal_notes"):
return node
return Word("")

p = QueryParser.factory(
mapping={
"internal_notes.note": RestrictedTerm(sysadmin_permission),
"_exists_": RestrictedTermValue(
sysadmin_permission, word=word_internal_notes
),
},
tree_transformer_cls=SearchFieldTransformer,
)

parser = p(system_identity)

assert parser.parse(query).to_dict() == {"query_string": {"query": query}}

parser = p(identity_simple)

assert parser.parse(query).to_dict() == {
"query_string": {"query": transformed_query}
}

0 comments on commit 4a0d858

Please sign in to comment.