From 421910108e3609f9acc4e415a78e7a1eb1786395 Mon Sep 17 00:00:00 2001 From: ppcad <45867125+ppcad@users.noreply.github.com> Date: Thu, 5 Dec 2024 09:52:06 +0100 Subject: [PATCH] Pre-compile patterns, add cache and support ignoring case in generic resolver (#716) * Pre-compile patterns, add cache and support ignoring case in generic resolver * Black formatting * Fix mistakes in generic resolver docstrings * Clarify docstrings for generic resolver * Refactor names in generic resolver * Add test with dict resolve list value to generic resolver * Add comments to generic resolver tests with cache metrics --- CHANGELOG.md | 3 + .../processor/generic_resolver/processor.py | 116 ++++++++- logprep/processor/generic_resolver/rule.py | 55 +++- .../generic_resolver/test_generic_resolver.py | 243 ++++++++++++++++++ 4 files changed, 400 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d8f9ab6b5..dea730406 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,9 @@ * fix `requester` documentation * replace `BaseException` with `Exception` for custom errors * refactor `generic_resolver` to validate rules on startup instead of application of each rule +* regex pattern lists for the `generic_resolver` are pre-compiled +* regex matching from lists in the `generic_resolver` is cached +* matching in the `generic_resolver` can be case-insensitive * rewrite the helper method `add_field_to` such that it always raises an `FieldExistsWarning` instead of return a bool. * add new helper method `add_fields_to` to directly add multiple fields to one event * refactored some processors to make use of the new helper methods diff --git a/logprep/processor/generic_resolver/processor.py b/logprep/processor/generic_resolver/processor.py index e386deda2..2e3449123 100644 --- a/logprep/processor/generic_resolver/processor.py +++ b/logprep/processor/generic_resolver/processor.py @@ -25,8 +25,13 @@ .. automodule:: logprep.processor.generic_resolver.rule """ -import re +from functools import cached_property, lru_cache +from typing import Optional +from attrs import define, field, validators + +from logprep.abc.processor import Processor +from logprep.metrics.metrics import GaugeMetric from logprep.processor.base.exceptions import FieldExistsWarning from logprep.processor.field_manager.processor import FieldManager from logprep.processor.generic_resolver.rule import GenericResolverRule @@ -36,8 +41,79 @@ class GenericResolver(FieldManager): """Resolve values in documents by referencing a mapping list.""" + @define(kw_only=True) + class Config(Processor.Config): + """GenericResolver config""" + + max_cache_entries: Optional[int] = field( + validator=validators.optional(validators.instance_of(int)), default=0 + ) + """(Optional) Size of cache for results when resolving form a list. + The cache can be disabled by setting it this option to :code:`0`.""" + cache_metrics_interval: Optional[int] = field( + validator=validators.optional(validators.instance_of(int)), default=1 + ) + """(Optional) Cache metrics won't be updated immediately. + Instead updating is skipped for a number of events before it's next update. + :code:`cache_metrics_interval` sets the number of events between updates (default: 1).""" + + @define(kw_only=True) + class Metrics(FieldManager.Metrics): + """Tracks statistics about the generic resolver""" + + new_results: GaugeMetric = field( + factory=lambda: GaugeMetric( + description="Number of newly resolved values", + name="generic_resolver_new_results", + ) + ) + """Number of newly resolved values""" + + cached_results: GaugeMetric = field( + factory=lambda: GaugeMetric( + description="Number of values resolved from cache", + name="generic_resolver_cached_results", + ) + ) + """Number of resolved values from cache""" + num_cache_entries: GaugeMetric = field( + factory=lambda: GaugeMetric( + description="Number of resolved values in cache", + name="generic_resolver_num_cache_entries", + ) + ) + """Number of values in cache""" + cache_load: GaugeMetric = field( + factory=lambda: GaugeMetric( + description="Relative cache load.", + name="generic_resolver_cache_load", + ) + ) + """Relative cache load.""" + + __slots__ = ["_cache_metrics_skip_count"] + + _cache_metrics_skip_count: int + rule_class = GenericResolverRule + @property + def max_cache_entries(self): + """Returns the configured number of max_cache_entries""" + return self._config.max_cache_entries + + @property + def cache_metrics_interval(self): + """Returns the configured cache_metrics_interval""" + return self._config.cache_metrics_interval + + @cached_property + def _get_lru_cached_value_from_list(self): + """Returns lru cashed method to retrieve values from list if configured""" + if self.max_cache_entries <= 0: + return self._resolve_value_from_list + return lru_cache(maxsize=self.max_cache_entries)(self._resolve_value_from_list) + def _apply_rules(self, event, rule): """Apply the given rule to the current event""" source_field_values = [ @@ -68,18 +144,46 @@ def _apply_rules(self, event, rule): ) except FieldExistsWarning as error: conflicting_fields.extend(error.skipped_fields) + + self._update_cache_metrics() + if conflicting_fields: raise FieldExistsWarning(rule, event, conflicting_fields) def _find_content_of_first_matching_pattern(self, rule, source_field_value): if rule.resolve_from_file: - pattern = f'^{rule.resolve_from_file["pattern"]}$' replacements = rule.resolve_from_file["additions"] - matches = re.match(pattern, source_field_value) + matches = rule.pattern.match(source_field_value) if matches: - content = replacements.get(matches.group("mapping")) + mapping = matches.group("mapping") + if rule.ignore_case: + mapping = mapping.upper() + content = replacements.get(mapping) if content: return content - for pattern, content in rule.resolve_list.items(): - if re.search(pattern, source_field_value): + return self._get_lru_cached_value_from_list(rule, source_field_value) + + def _resolve_value_from_list( + self, rule: GenericResolverRule, source_field_value: str + ) -> Optional[str]: + for pattern, content in rule.compiled_resolve_list: + if pattern.search(source_field_value): return content + + def _update_cache_metrics(self): + if self.max_cache_entries <= 0: + return + self._cache_metrics_skip_count += 1 + if self._cache_metrics_skip_count < self.cache_metrics_interval: + return + self._cache_metrics_skip_count = 0 + + cache_info = self._get_lru_cached_value_from_list.cache_info() + self.metrics.new_results += cache_info.misses + self.metrics.cached_results += cache_info.hits + self.metrics.num_cache_entries += cache_info.currsize + self.metrics.cache_load += cache_info.currsize / cache_info.maxsize + + def setup(self): + super().setup() + self._cache_metrics_skip_count = 0 diff --git a/logprep/processor/generic_resolver/rule.py b/logprep/processor/generic_resolver/rule.py index e2ab14c44..255b7337b 100644 --- a/logprep/processor/generic_resolver/rule.py +++ b/logprep/processor/generic_resolver/rule.py @@ -38,6 +38,22 @@ The resolver will check for the pattern and get value captured by the :code:`mapping` group. This captured value is then used in the list from the file. +:code:`ignore_case` can be set to ignore the case when matching values that will be resolved. +It is disabled by default. In the following example :code:`to_resolve: heLLo` would be resolved, +since :code:`ignore_case` is set to true. + +.. code-block:: yaml + :linenos: + :caption: Example + + filter: to_resolve + generic_resolver: + field_mapping: + to_resolve: resolved + resolve_list: + .*Hello.*: Greeting + ignore_case: true + In the following example :code:`to_resolve` will be checked by the regex pattern :code:`\d*(?P[a-z]+)\d*` and the list in :code:`path/to/resolve_mapping.yml` will be used to add new fields. @@ -72,7 +88,10 @@ :noindex: """ +import re +from functools import cached_property from pathlib import Path +from typing import Optional, Tuple, List from attrs import define, field, validators @@ -98,16 +117,7 @@ class Config(FieldManagerRule.Config): ] ) """Mapping in form of :code:`{SOURCE_FIELD: DESTINATION_FIELD}`""" - resolve_list: dict = field( - validator=[ - validators.instance_of(dict), - validators.deep_mapping( - key_validator=validators.instance_of(str), - value_validator=validators.instance_of(str), - ), - ], - factory=dict, - ) + resolve_list: dict = field(validator=[validators.instance_of(dict)], factory=dict) """lookup mapping in form of :code:`{REGEX_PATTERN_0: ADDED_VALUE_0, ..., REGEX_PATTERN_N: ADDED_VALUE_N}`""" resolve_from_file: dict = field( @@ -125,6 +135,8 @@ class Config(FieldManagerRule.Config): a regex pattern which can be used to resolve values. The resolve list in the file at :code:`path` is then used in conjunction with the regex pattern in :code:`pattern`.""" + ignore_case: Optional[str] = field(validator=validators.instance_of(bool), default=False) + """(Optional) Ignore case when matching resolve values. Defaults to :code:`False`.""" def __attrs_post_init__(self): if self.resolve_from_file: @@ -142,8 +154,11 @@ def __attrs_post_init__(self): isinstance(value, str) for value in add_dict.values() ): raise InvalidConfigurationError( - f"Additions file '{file_path}' must be a dictionary with string values! (Rule ID: '{self.id}')", + f"Additions file '{file_path}' must be a dictionary " + f"with string values! (Rule ID: '{self.id}')", ) + if self.ignore_case: + add_dict = {key.upper(): value for key, value in add_dict.items()} self.resolve_from_file["additions"] = add_dict @property @@ -156,7 +171,25 @@ def resolve_list(self) -> dict: """Returns the resolve list""" return self._config.resolve_list + @cached_property + def compiled_resolve_list(self) -> List[Tuple[re.Pattern, str]]: + """Returns the resolve list with tuple pairs of compiled patterns and values""" + return [ + (re.compile(pattern, re.I if self.ignore_case else 0), val) + for pattern, val in self._config.resolve_list.items() + ] + @property def resolve_from_file(self) -> dict: """Returns the resolve file""" return self._config.resolve_from_file + + @property + def ignore_case(self) -> bool: + """Returns if the matching should be case-sensitive or not""" + return self._config.ignore_case + + @cached_property + def pattern(self) -> re.Pattern: + """Pattern used to resolve from file""" + return re.compile(f'^{self.resolve_from_file["pattern"]}$', re.I if self.ignore_case else 0) diff --git a/tests/unit/processor/generic_resolver/test_generic_resolver.py b/tests/unit/processor/generic_resolver/test_generic_resolver.py index b04f3d3d5..09e5d2e6e 100644 --- a/tests/unit/processor/generic_resolver/test_generic_resolver.py +++ b/tests/unit/processor/generic_resolver/test_generic_resolver.py @@ -3,7 +3,9 @@ # pylint: disable=missing-docstring # pylint: disable=wrong-import-position from collections import OrderedDict +from copy import deepcopy +from logprep.factory import Factory from logprep.processor.base.exceptions import FieldExistsWarning from logprep.processor.generic_resolver.processor import GenericResolver from tests.unit.processor.base import BaseProcessorTestCase @@ -17,6 +19,13 @@ class TestGenericResolver(BaseProcessorTestCase): "tree_config": "tests/testdata/unit/shared_data/tree_config.json", } + expected_metrics = [ + "logprep_generic_resolver_new_results", + "logprep_generic_resolver_cached_results", + "logprep_generic_resolver_num_cache_entries", + "logprep_generic_resolver_cache_load", + ] + @property def specific_rules_dirs(self): """Return the paths of the specific rules""" @@ -51,6 +60,47 @@ def test_resolve_not_dotted_field_no_conflict_match(self): assert document == expected + def test_resolve_with_dict_value(self): + rule = { + "filter": "to_resolve", + "generic_resolver": { + "field_mapping": {"to_resolve": "resolved"}, + "resolve_list": {".*HELLO\\d": {"Greeting": "Hello"}}, + }, + } + + self._load_specific_rule(rule) + + expected = {"to_resolve": "something HELLO1", "resolved": {"Greeting": "Hello"}} + + document = {"to_resolve": "something HELLO1"} + + self.object.process(document) + + assert document == expected + + def test_resolve_from_mapping_with_ignore_case(self): + rule = { + "filter": "to_resolve", + "generic_resolver": { + "field_mapping": {"to_resolve": "resolved"}, + "resolve_list": {".*HELLO\\d": "Greeting"}, + "ignore_case": True, + }, + } + + self._load_specific_rule(rule) + + expected = {"to_resolve": "something HELLO1", "resolved": "Greeting"} + document = {"to_resolve": "something HELLO1"} + self.object.process(document) + assert document == expected + + expected = {"to_resolve": "something hello1", "resolved": "Greeting"} + document = {"to_resolve": "something hello1"} + self.object.process(document) + assert document == expected + def test_resolve_not_dotted_field_no_conflict_and_to_list_entries_match( self, ): @@ -136,6 +186,33 @@ def test_resolve_dotted_field_no_conflict_match_from_file( assert document == expected + def test_resolve_from_file_with_ignore_case( + self, + ): + rule = { + "filter": "to_resolve", + "generic_resolver": { + "field_mapping": {"to_resolve": "resolved"}, + "resolve_from_file": { + "path": "tests/testdata/unit/generic_resolver/resolve_mapping.yml", + "pattern": r"\d*(?P[a-z]+)\d*", + }, + "ignore_case": True, + "resolve_list": {"FOO": "BAR"}, + }, + } + self._load_specific_rule(rule) + + expected = {"to_resolve": "ab", "resolved": "ab_server_type"} + document = {"to_resolve": "ab"} + self.object.process(document) + assert document == expected + + expected = {"to_resolve": "AB", "resolved": "ab_server_type"} + document = {"to_resolve": "AB"} + self.object.process(document) + assert document == expected + def test_resolve_from_file_and_from_list(self): rule = { "filter": "to_resolve_1 AND to_resolve_2", @@ -409,3 +486,169 @@ def test_resolve_generic_and_multiple_match_first_only(self): self.object.process(document) assert document == expected + + def test_resolve_from_cache_with_large_enough_cache(self): + """The metrics are mocked and their values are the sum of previously added cache values, + instead of being the current cache values.""" + config = deepcopy(self.CONFIG) + config["max_cache_entries"] = 10 + self.object = Factory.create({"generic_resolver": config}) + + rule_dict = { + "filter": "to_resolve", + "generic_resolver": { + "field_mapping": {"to_resolve": "resolved"}, + "resolve_list": {".+ar": "res_bar", ".+oo": "res_foo"}, + }, + } + event = {"to_resolve": "foo"} + self._load_specific_rule(rule_dict) + self.object.setup() + + self.object.metrics.new_results = 0 + self.object.metrics.cached_results = 0 + self.object.metrics.num_cache_entries = 0 + + self.object.process(event) + + assert self.object.metrics.new_results == 1 + assert self.object.metrics.cached_results == 0 + assert self.object.metrics.num_cache_entries == 1 + + self.object.process(event) + + assert self.object.metrics.new_results == 2 + assert self.object.metrics.cached_results == 1 + assert self.object.metrics.num_cache_entries == 2 + + self.object.process({"to_resolve": "bar"}) + + assert self.object.metrics.new_results == 4 + assert self.object.metrics.cached_results == 2 + assert self.object.metrics.num_cache_entries == 4 + + def test_resolve_from_cache_with_cache_smaller_than_results(self): + """The metrics are mocked and their values are the sum of previously added cache values, + instead of being the current cache values.""" + config = deepcopy(self.CONFIG) + config["max_cache_entries"] = 1 + self.object = Factory.create({"generic_resolver": config}) + + rule_dict = { + "filter": "to_resolve", + "generic_resolver": { + "field_mapping": {"to_resolve": "resolved"}, + "resolve_list": {".+ar": "res_bar", ".+oo": "res_foo"}, + }, + } + event = {"to_resolve": "foo"} + self._load_specific_rule(rule_dict) + self.object.setup() + + self.object.metrics.new_results = 0 + self.object.metrics.cached_results = 0 + self.object.metrics.num_cache_entries = 0 + + self.object.process(event) + + assert self.object.metrics.new_results == 1 + assert self.object.metrics.cached_results == 0 + assert self.object.metrics.num_cache_entries == 1 + + self.object.process(event) + + assert self.object.metrics.new_results == 2 + assert self.object.metrics.cached_results == 1 + assert self.object.metrics.num_cache_entries == 2 + + self.object.process({"to_resolve": "bar"}) + + assert self.object.metrics.new_results == 4 + assert self.object.metrics.cached_results == 2 + assert self.object.metrics.num_cache_entries == 3 + + def test_resolve_without_cache(self): + config = deepcopy(self.CONFIG) + config["max_cache_entries"] = 0 + self.object = Factory.create({"generic_resolver": config}) + + rule_dict = { + "filter": "to_resolve", + "generic_resolver": { + "field_mapping": {"to_resolve": "resolved"}, + "resolve_list": {".+ar": "res_bar", ".+oo": "res_foo"}, + }, + } + event = {"to_resolve": "foo"} + self._load_specific_rule(rule_dict) + self.object.setup() + + self.object.metrics.new_results = 0 + self.object.metrics.cached_results = 0 + self.object.metrics.num_cache_entries = 0 + + self.object.process(event) + + assert self.object.metrics.new_results == 0 + assert self.object.metrics.cached_results == 0 + assert self.object.metrics.num_cache_entries == 0 + + self.object.process(event) + + assert self.object.metrics.new_results == 0 + assert self.object.metrics.cached_results == 0 + assert self.object.metrics.num_cache_entries == 0 + + self.object.process({"to_resolve": "bar"}) + + assert self.object.metrics.new_results == 0 + assert self.object.metrics.cached_results == 0 + assert self.object.metrics.num_cache_entries == 0 + + def test_resolve_from_cache_with_update_interval_2(self): + """The metrics are mocked and their values are the sum of previously added cache values, + instead of being the current cache values.""" + config = deepcopy(self.CONFIG) + config["cache_metrics_interval"] = 2 + config["max_cache_entries"] = 10 + self.object = Factory.create({"generic_resolver": config}) + + rule_dict = { + "filter": "to_resolve", + "generic_resolver": { + "field_mapping": {"to_resolve": "resolved"}, + "resolve_list": {".+ar": "res_bar", ".+oo": "res_foo"}, + }, + } + event = {"to_resolve": "foo"} + other_event = {"to_resolve": "bar"} + self._load_specific_rule(rule_dict) + self.object.setup() + + self.object.metrics.new_results = 0 + self.object.metrics.cached_results = 0 + self.object.metrics.num_cache_entries = 0 + + self.object.process(event) + + assert self.object.metrics.new_results == 0 + assert self.object.metrics.cached_results == 0 + assert self.object.metrics.num_cache_entries == 0 + + self.object.process(event) + + assert self.object.metrics.new_results == 1 + assert self.object.metrics.cached_results == 1 + assert self.object.metrics.num_cache_entries == 1 + + self.object.process(other_event) + + assert self.object.metrics.new_results == 1 + assert self.object.metrics.cached_results == 1 + assert self.object.metrics.num_cache_entries == 1 + + self.object.process(other_event) + + assert self.object.metrics.new_results == 3 + assert self.object.metrics.cached_results == 3 + assert self.object.metrics.num_cache_entries == 3