From ad3f5071b91bfc990c8dcfafd6f12d0f657444bb Mon Sep 17 00:00:00 2001
From: Fabio Ambauen <1833932+open-dynaMIX@users.noreply.github.com>
Date: Mon, 19 Oct 2020 16:05:33 +0200
Subject: [PATCH] feat(cli): Regex based IP detection

This commit implements regex based IP detection. This is intended to use
for logfiles where column based detection doesn't work.

See RFC (#44) for more information.

Closes #44
---
 README.md |  4 ++-
 anonip.py | 70 +++++++++++++++++++++++++++++++++++++++++++----
 tests.py  | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++----
 3 files changed, 144 insertions(+), 12 deletions(-)

diff --git a/README.md b/README.md
index 81dfabf..7facec3 100644
--- a/README.md
+++ b/README.md
@@ -34,6 +34,7 @@ Using shell redirects, it's also possible to rewrite existing log files.
  - Masks IP addresses in log files
  - Configurable amount of masked bits
  - The column containing the IP address can freely be chosen
+ - Alternatively use a regex to point anonip to the location(s) of the IP(s). See [this RFC](https://github.com/DigitaleGesellschaft/Anonip/issues/44) for more information.
  - Works for both access.log- and error.log files
 
 ## Officially supported python versions
@@ -56,7 +57,7 @@ For python versions <3.3:
 ```
 usage: anonip.py [-h] [-4 INTEGER] [-6 INTEGER] [-i INTEGER] [-o FILE]
                  [--input FILE] [-c INTEGER [INTEGER ...]] [-l STRING]
-                 [-r STRING] [-p] [-d] [-v]
+                 [--regex STRING] [-r STRING] [-p] [-d] [-v]
 
 Anonip is a tool to anonymize IP-addresses in log files.
 
@@ -76,6 +77,7 @@ optional arguments:
                         default: 1)
   -l STRING, --delimiter STRING
                         log delimiter (default: " ")
+  --regex STRING        regex for detecting IP addresses (use instead of -c)
   -r STRING, --replace STRING
                         replacement string in case address parsing fails
                         (Example: 0.0.0.0)
diff --git a/anonip.py b/anonip.py
index 1f93677..e2b0142 100755
--- a/anonip.py
+++ b/anonip.py
@@ -41,6 +41,7 @@
 
 import argparse
 import logging
+import re
 import sys
 from io import open
 
@@ -79,6 +80,7 @@ def __init__(
         increment=0,
         delimiter=" ",
         replace=None,
+        regex=None,
         skip_private=False,
     ):
         """
@@ -99,8 +101,13 @@ def __init__(
         self.increment = increment
         self.delimiter = delimiter
         self.replace = replace
+        self.regex = regex
         self.skip_private = skip_private
 
+        self.process_method = self.process_line_column
+        if self.regex:
+            self.process_method = self.process_line_regex
+
     @property
     def columns(self):
         return self._columns
@@ -154,7 +161,8 @@ def run(self, input_file=None):
 
             logger.debug("Got line: %r", line)
 
-            yield self.process_line(line)
+            yield self.process_method(line)
+
             line = input_file.readline()
 
     def process_ip(self, ip):
@@ -177,9 +185,36 @@ def process_ip(self, ip):
                     )
             return trunc_ip
 
-    def process_line(self, line):
+    def process_line_regex(self, line):
+        """
+        This function processes a single line based on the provided regex.
+
+        It returns the anonymized log line as string.
+
+        :param line: str
+        :return: str
+        """
+        match = re.match(self.regex, line)
+        if not match:
+            logger.debug("Regex did not match!")
+            return line
+        groups = match.groups()
+
+        for m in set(groups):
+            if not m:
+                continue
+            ip_str, ip = self.extract_ip(m)
+            if ip:
+                trunc_ip = self.process_ip(ip)
+                line = line.replace(ip_str, str(trunc_ip))
+            elif self.replace:
+                line = line.replace(m, self.replace)
+
+        return line
+
+    def process_line_column(self, line):
         """
-        This function processes a single line.
+        This function processes a single line based on the provided columns.
 
         It returns the anonymized log line as string.
 
@@ -298,6 +333,17 @@ def _validate_integer_ht_0(value):
     return value
 
 
+def regex_arg_type(value):
+    try:
+        return re.compile(value)
+    except re.error as e:
+        msg = "must be a valid regex."
+        if hasattr(e, "msg"):  # pragma: no cover
+            # not available on py27
+            msg = "must be a valid regex. Error: {}".format(e.msg)
+        raise argparse.ArgumentTypeError(msg)
+
+
 def parse_arguments(args):
     """
     Parse all given arguments.
@@ -351,7 +397,6 @@ def parse_arguments(args):
         type=lambda x: _validate_integer_ht_0(x),
         help="assume IP address is in column n (1-based indexed; default: 1)",
     )
-    parser.set_defaults(column=[1])
     parser.add_argument(
         "-l",
         "--delimiter",
@@ -359,7 +404,12 @@ def parse_arguments(args):
         type=str,
         help='log delimiter (default: " ")',
     )
-    parser.set_defaults(delimiter=" ")
+    parser.add_argument(
+        "--regex",
+        metavar="STRING",
+        help="regex for detecting IP addresses (use optionally instead of -c)",
+        type=regex_arg_type,
+    )
     parser.add_argument(
         "-r",
         "--replace",
@@ -381,6 +431,15 @@ def parse_arguments(args):
 
     args = parser.parse_args(args)
 
+    if args.regex and (args.columns is not None or args.delimiter is not None):
+        raise parser.error(
+            'Ambiguous arguments: When using "--regex", "-c" and "-l" can\'t be used.'
+        )
+    if not args.regex and args.columns is None:
+        args.columns = [1]
+    if not args.regex and args.delimiter is None:
+        args.delimiter = " "
+
     return args
 
 
@@ -402,6 +461,7 @@ def main():
         args.increment,
         args.delimiter,
         args.replace,
+        args.regex,
         args.skip_private,
     )
 
diff --git a/tests.py b/tests.py
index 7762f0b..dde02b8 100755
--- a/tests.py
+++ b/tests.py
@@ -10,6 +10,7 @@
 
 import argparse
 import logging
+import re
 import sys
 from io import StringIO
 
@@ -81,7 +82,7 @@
 )
 def test_process_line(ip, v4mask, v6mask, expected):
     a = anonip.Anonip(ipv4mask=v4mask, ipv6mask=v6mask)
-    assert a.process_line(ip) == expected
+    assert a.process_line_column(ip) == expected
 
 
 @pytest.mark.parametrize(
@@ -93,7 +94,7 @@ def test_process_line(ip, v4mask, v6mask, expected):
 )
 def test_increment(ip, increment, expected):
     a = anonip.Anonip(increment=increment)
-    assert a.process_line(ip) == expected
+    assert a.process_line_column(ip) == expected
 
 
 @pytest.mark.parametrize(
@@ -128,25 +129,71 @@ def test_increment(ip, increment, expected):
 )
 def test_column(line, columns, expected):
     a = anonip.Anonip(columns=columns)
-    assert a.process_line(line) == expected
+    assert a.process_line_column(line) == expected
+
+
+@pytest.mark.parametrize(
+    "line,regex,expected,replace",
+    [
+        (
+            '3.3.3.3 - - [20/May/2015:21:05:01 +0000] "GET / HTTP/1.1" 200 13358 "-" "useragent"',
+            re.compile(r"(?:^(.*) - - |.* - somefixedstring: (.*) - .* - (.*))"),
+            '3.3.0.0 - - [20/May/2015:21:05:01 +0000] "GET / HTTP/1.1" 200 13358 "-" "useragent"',
+            None,
+        ),
+        (
+            "blabla/ 3.3.3.3 /blublu",
+            re.compile(r"^blabla/ (.*) /blublu"),
+            "blabla/ 3.3.0.0 /blublu",
+            None,
+        ),
+        (
+            "1.1.1.1 - somefixedstring: 2.2.2.2 - some random stuff - 3.3.3.3",
+            re.compile(r"^(.*) - somefixedstring: (.*) - .* - (.*)"),
+            "1.1.0.0 - somefixedstring: 2.2.0.0 - some random stuff - 3.3.0.0",
+            None,
+        ),
+        (
+            "some line that doesn't match the provided regex",
+            re.compile(r"^(.*) - somefixedstring: (.*) - .* - (.*)"),
+            "some line that doesn't match the provided regex",
+            None,
+        ),
+        (
+            "match but no ip/ notanip /blublu",
+            re.compile(r"^match but no ip/ (.*) /blublu"),
+            "match but no ip/ notanip /blublu",
+            None,
+        ),
+        (
+            "match but no ip/ notanip /blublu",
+            re.compile(r"^match but no ip/ (.*) /blublu"),
+            "match but no ip/ yeah /blublu",
+            "yeah",
+        ),
+    ],
+)
+def test_regex(line, regex, expected, replace):
+    a = anonip.Anonip(regex=regex, replace=replace)
+    assert a.process_line_regex(line) == expected
 
 
 def test_replace():
     a = anonip.Anonip(replace="replacement")
-    assert a.process_line("bla something") == "replacement something"
+    assert a.process_line_column("bla something") == "replacement something"
 
 
 def test_delimiter():
     a = anonip.Anonip(delimiter=";")
     assert (
-        a.process_line("192.168.100.200;some;string;with;öéäü")
+        a.process_line_column("192.168.100.200;some;string;with;öéäü")
         == "192.168.96.0;some;string;with;öéäü"
     )
 
 
 def test_private():
     a = anonip.Anonip(skip_private=True)
-    assert a.process_line("192.168.100.200") == "192.168.100.200"
+    assert a.process_line_column("192.168.100.200") == "192.168.100.200"
 
 
 def test_run(monkeypatch):
@@ -181,6 +228,20 @@ def test_cli_generic_args(args, attribute, expected):
     assert getattr(anonip.parse_arguments(args), attribute) == expected
 
 
+@pytest.mark.parametrize(
+    "args",
+    [
+        (["--regex", "test", "-c", "3"]),
+        (["--regex", "test", "-l", ";"]),
+        (["--regex", "test", "-l", ";", "-c", "4"]),
+    ],
+)
+def test_cli_args_ambiguity(args):
+    with pytest.raises(SystemExit) as e:
+        anonip.parse_arguments(args)
+    assert e.value.code == 2
+
+
 @pytest.mark.parametrize(
     "value,valid,bits",
     [
@@ -210,6 +271,15 @@ def test_cli_validate_integer_ht_0(value, valid):
             anonip._validate_integer_ht_0(value)
 
 
+@pytest.mark.parametrize("value,valid", [("valid (.*)", True), ("\\9", False)])
+def test_regex_arg_type(value, valid):
+    if valid:
+        assert anonip.regex_arg_type(value) == re.compile(value)
+    else:
+        with pytest.raises(argparse.ArgumentTypeError):
+            anonip.regex_arg_type(value)
+
+
 @pytest.mark.parametrize("to_file", [False, True])
 @pytest.mark.parametrize("debug,log_level", [(False, 30), (True, 10)])
 def test_main(