Skip to content

Commit

Permalink
feat(cli): Regex based IP detection
Browse files Browse the repository at this point in the history
This commit implements regex based IP detection. This is intended to use
for logfiles where column based detection doesn't work.

See RFC (#44) for more information.

Closes #44
  • Loading branch information
open-dynaMIX committed Dec 17, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent b516ea9 commit 228a7c2
Showing 3 changed files with 174 additions and 12 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ Using shell redirects, it's also possible to rewrite existing log files.
- Masks IP addresses in log files
- Configurable amount of masked bits
- The column containing the IP address can freely be chosen
- Alternatively use a regex to point anonip to the location(s) of the IP(s). See [this RFC](https://github.com/DigitaleGesellschaft/Anonip/issues/44) for more information.
- Works for both access.log- and error.log files

## Officially supported python versions
@@ -57,7 +58,7 @@ For python versions <3.3:
```
usage: anonip.py [-h] [-4 INTEGER] [-6 INTEGER] [-i INTEGER] [-o FILE]
[--input FILE] [-c INTEGER [INTEGER ...]] [-l STRING]
[-r STRING] [-p] [-d] [-v]
[--regex STRING [STRING ...]] [-r STRING] [-p] [-d] [-v]
Anonip is a tool to anonymize IP-addresses in log files.
@@ -77,13 +78,18 @@ optional arguments:
default: 1)
-l STRING, --delimiter STRING
log delimiter (default: " ")
--regex STRING [STRING ...]
regex for detecting IP addresses (use instead of -c)
-r STRING, --replace STRING
replacement string in case address parsing fails
(Example: 0.0.0.0)
-p, --skip-private do not mask addresses in private ranges. See IANA
Special-Purpose Address Registry.
-d, --debug print debug messages
-v, --version show program's version number and exit
Example-usage in apache-config:
CustomLog "| /path/to/anonip.py [OPTIONS] --output /path/to/log" combined
```

## Usage
77 changes: 72 additions & 5 deletions anonip.py
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@

import argparse
import logging
import re
import sys
from io import open

@@ -78,6 +79,7 @@ def __init__(
increment=0,
delimiter=" ",
replace=None,
regex=None,
skip_private=False,
):
"""
@@ -98,8 +100,13 @@ def __init__(
self.increment = increment
self.delimiter = delimiter
self.replace = replace
self.regex = regex
self.skip_private = skip_private

self.process_method = self.process_line_column
if self.regex:
self.process_method = self.process_line_regex

@property
def columns(self):
return self._columns
@@ -153,7 +160,8 @@ def run(self, input_file=None):

logger.debug("Got line: %r", line)

yield self.process_line(line)
yield self.process_method(line)

line = input_file.readline()

def process_ip(self, ip):
@@ -176,9 +184,36 @@ def process_ip(self, ip):
)
return trunc_ip

def process_line(self, line):
def process_line_regex(self, line):
"""
This function processes a single line based on the provided regex.
It returns the anonymized log line as string.
:param line: str
:return: str
"""
match = re.match(self.regex, line)
if not match:
logger.debug("Regex did not match!")
return line
groups = match.groups()

for m in set(groups):
if not m:
continue
ip_str, ip = self.extract_ip(m)
if ip:
trunc_ip = self.process_ip(ip)
line = line.replace(ip_str, str(trunc_ip))
elif self.replace:
line = line.replace(m, self.replace)

return line

def process_line_column(self, line):
"""
This function processes a single line.
This function processes a single line based on the provided columns.
It returns the anonymized log line as string.
@@ -297,6 +332,18 @@ def _validate_integer_ht_0(value):
return value


def regex_arg_type(value):
try:
re.compile(value)
except re.error as e:
msg = "must be a valid regex."
if hasattr(e, "msg"): # pragma: no cover
# not available on py27
msg = "must be a valid regex. Error: {}".format(e.msg)
raise argparse.ArgumentTypeError(msg)
return value


def parse_arguments(args):
"""
Parse all given arguments.
@@ -350,15 +397,20 @@ def parse_arguments(args):
type=lambda x: _validate_integer_ht_0(x),
help="assume IP address is in column n (1-based indexed; default: 1)",
)
parser.set_defaults(column=[1])
parser.add_argument(
"-l",
"--delimiter",
metavar="STRING",
type=str,
help='log delimiter (default: " ")',
)
parser.set_defaults(delimiter=" ")
parser.add_argument(
"--regex",
metavar="STRING",
nargs="+",
help="regex for detecting IP addresses (use optionally instead of -c)",
type=regex_arg_type,
)
parser.add_argument(
"-r",
"--replace",
@@ -380,6 +432,20 @@ def parse_arguments(args):

args = parser.parse_args(args)

if args.regex and (args.columns is not None or args.delimiter is not None):
raise parser.error(
'Ambiguous arguments: When using "--regex", "-c" and "-l" can\'t be used.'
)
if not args.regex and args.columns is None:
args.columns = [1]
if not args.regex and args.delimiter is None:
args.delimiter = " "
if args.regex:
try:
args.regex = re.compile(r"|".join(args.regex))
except re.error: # pragma: no cover
raise argparse.ArgumentTypeError("Failed to compile concatenated regex!")

return args


@@ -402,6 +468,7 @@ def main():
args.increment,
args.delimiter,
args.replace,
args.regex,
args.skip_private,
)

101 changes: 95 additions & 6 deletions tests.py
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@

import argparse
import logging
import re
import sys
from io import StringIO

@@ -78,7 +79,7 @@
)
def test_process_line(ip, v4mask, v6mask, expected):
a = anonip.Anonip(ipv4mask=v4mask, ipv6mask=v6mask)
assert a.process_line(ip) == expected
assert a.process_line_column(ip) == expected


@pytest.mark.parametrize(
@@ -90,7 +91,7 @@ def test_process_line(ip, v4mask, v6mask, expected):
)
def test_increment(ip, increment, expected):
a = anonip.Anonip(increment=increment)
assert a.process_line(ip) == expected
assert a.process_line_column(ip) == expected


@pytest.mark.parametrize(
@@ -125,25 +126,71 @@ def test_increment(ip, increment, expected):
)
def test_column(line, columns, expected):
a = anonip.Anonip(columns=columns)
assert a.process_line(line) == expected
assert a.process_line_column(line) == expected


@pytest.mark.parametrize(
"line,regex,expected,replace",
[
(
'3.3.3.3 - - [20/May/2015:21:05:01 +0000] "GET / HTTP/1.1" 200 13358 "-" "useragent"',
re.compile(r"(?:^(.*) - - |.* - somefixedstring: (.*) - .* - (.*))"),
'3.3.0.0 - - [20/May/2015:21:05:01 +0000] "GET / HTTP/1.1" 200 13358 "-" "useragent"',
None,
),
(
"blabla/ 3.3.3.3 /blublu",
re.compile(r"^blabla/ (.*) /blublu"),
"blabla/ 3.3.0.0 /blublu",
None,
),
(
"1.1.1.1 - somefixedstring: 2.2.2.2 - some random stuff - 3.3.3.3",
re.compile(r"^(.*) - somefixedstring: (.*) - .* - (.*)"),
"1.1.0.0 - somefixedstring: 2.2.0.0 - some random stuff - 3.3.0.0",
None,
),
(
"some line that doesn't match the provided regex",
re.compile(r"^(.*) - somefixedstring: (.*) - .* - (.*)"),
"some line that doesn't match the provided regex",
None,
),
(
"match but no ip/ notanip /blublu",
re.compile(r"^match but no ip/ (.*) /blublu"),
"match but no ip/ notanip /blublu",
None,
),
(
"match but no ip/ notanip /blublu",
re.compile(r"^match but no ip/ (.*) /blublu"),
"match but no ip/ yeah /blublu",
"yeah",
),
],
)
def test_regex(line, regex, expected, replace):
a = anonip.Anonip(regex=regex, replace=replace)
assert a.process_line_regex(line) == expected


def test_replace():
a = anonip.Anonip(replace="replacement")
assert a.process_line("bla something") == "replacement something"
assert a.process_line_column("bla something") == "replacement something"


def test_delimiter():
a = anonip.Anonip(delimiter=";")
assert (
a.process_line("192.168.100.200;some;string;with;öéäü")
a.process_line_column("192.168.100.200;some;string;with;öéäü")
== "192.168.96.0;some;string;with;öéäü"
)


def test_private():
a = anonip.Anonip(skip_private=True)
assert a.process_line("192.168.100.200") == "192.168.100.200"
assert a.process_line_column("192.168.100.200") == "192.168.100.200"


def test_run(monkeypatch):
@@ -178,6 +225,39 @@ def test_cli_generic_args(args, attribute, expected):
assert getattr(anonip.parse_arguments(args), attribute) == expected


@pytest.mark.parametrize(
"args,success",
[
([], True),
(["--regex", "test"], True),
(["-c", "4"], True),
(["--regex", "test", "-c", "3"], False),
(["--regex", "test", "-l", ";"], False),
(["--regex", "test", "-l", ";", "-c", "4"], False),
],
)
def test_cli_args_ambiguity(args, success):
if success:
anonip.parse_arguments(args)
return

with pytest.raises(SystemExit) as e:
anonip.parse_arguments(args)
assert e.value.code == 2


@pytest.mark.parametrize(
"args,expected",
[
(["--regex", "test"], "test"),
(["--regex", "foo", "bar", "baz"], "foo|bar|baz"),
],
)
def test_regex_concat(args, expected):
args = anonip.parse_arguments(args)
assert args.regex == re.compile(expected)


@pytest.mark.parametrize(
"value,valid,bits",
[
@@ -207,6 +287,15 @@ def test_cli_validate_integer_ht_0(value, valid):
anonip._validate_integer_ht_0(value)


@pytest.mark.parametrize("value,valid", [("valid (.*)", True), ("\\9", False)])
def test_regex_arg_type(value, valid):
if valid:
assert anonip.regex_arg_type(value) == value
else:
with pytest.raises(argparse.ArgumentTypeError):
anonip.regex_arg_type(value)


@pytest.mark.parametrize("to_file", [False, True])
@pytest.mark.parametrize("debug,log_level", [(False, 30), (True, 10)])
def test_main(

0 comments on commit 228a7c2

Please sign in to comment.