From bf3745692a6acca9e99c2845c9f03d26861725ba Mon Sep 17 00:00:00 2001 From: Fabio Ambauen <1833932+open-dynaMIX@users.noreply.github.com> Date: Mon, 19 Oct 2020 16:05:33 +0200 Subject: [PATCH] feat(cli): Regex based IP detection This commit implements regex based IP detection. This is intended to use for logfiles where column based detection doesn't work. See RFC (#44) for more information. Closes #42, closes #44 --- README.md | 8 ++++- anonip.py | 83 +++++++++++++++++++++++++++++++++++++++++++++++--- tests.py | 91 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 177 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 9532467..0947cd0 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ Using shell redirects, it's also possible to rewrite existing log files. - Masks IP addresses in log files - Configurable amount of masked bits - The column containing the IP address can freely be chosen + - Alternatively use a regex to point anonip to the location(s) of the IP(s). See [this RFC](https://github.com/DigitaleGesellschaft/Anonip/issues/44) for more information. - Works for both access.log- and error.log files ## Officially supported python versions @@ -57,7 +58,7 @@ For python versions <3.3: ``` usage: anonip.py [-h] [-4 INTEGER] [-6 INTEGER] [-i INTEGER] [-o FILE] [--input FILE] [-c INTEGER [INTEGER ...]] [-l STRING] - [-r STRING] [-p] [-d] [-v] + [--regex STRING [STRING ...]] [-r STRING] [-p] [-d] [-v] Anonip is a tool to anonymize IP-addresses in log files. @@ -77,6 +78,8 @@ optional arguments: default: 1) -l STRING, --delimiter STRING log delimiter (default: " ") + --regex STRING [STRING ...] + regex for detecting IP addresses (use instead of -c) -r STRING, --replace STRING replacement string in case address parsing fails (Example: 0.0.0.0) @@ -84,6 +87,9 @@ optional arguments: Special-Purpose Address Registry. -d, --debug print debug messages -v, --version show program's version number and exit + +Example-usage in apache-config: +CustomLog "| /path/to/anonip.py [OPTIONS] --output /path/to/log" combined ``` ## Usage diff --git a/anonip.py b/anonip.py index 52ac7c2..2850784 100755 --- a/anonip.py +++ b/anonip.py @@ -41,6 +41,7 @@ import argparse import logging +import re import sys from io import open @@ -78,6 +79,7 @@ def __init__( increment=0, delimiter=" ", replace=None, + regex=None, skip_private=False, ): """ @@ -98,6 +100,7 @@ def __init__( self.increment = increment self.delimiter = delimiter self.replace = replace + self.regex = regex self.skip_private = skip_private @property @@ -154,6 +157,7 @@ def run(self, input_file=None): logger.debug("Got line: %r", line) yield self.process_line(line) + line = input_file.readline() def process_ip(self, ip): @@ -176,9 +180,36 @@ def process_ip(self, ip): ) return trunc_ip - def process_line(self, line): + def process_line_regex(self, line): """ - This function processes a single line. + This function processes a single line based on the provided regex. + + It returns the anonymized log line as string. + + :param line: str + :return: str + """ + match = re.match(self.regex, line) + if not match: + logger.debug("Regex did not match!") + return line + groups = match.groups() + + for m in set(groups): + if not m: + continue + ip_str, ip = self.extract_ip(m) + if ip: + trunc_ip = self.process_ip(ip) + line = line.replace(ip_str, str(trunc_ip)) + elif self.replace: + line = line.replace(m, self.replace) + + return line + + def process_line_column(self, line): + """ + This function processes a single line based on the provided columns. It returns the anonymized log line as string. @@ -206,6 +237,18 @@ def process_line(self, line): return self.delimiter.join(loglist) + def process_line(self, line): + """ + This function processes a single line. + It returns the anonymized log line as string. + + :param line: str + :return: str + """ + if self.regex: + return self.process_line_regex(line) + return self.process_line_column(line) + @staticmethod def extract_ip(column): """ @@ -297,6 +340,18 @@ def _validate_integer_ht_0(value): return value +def regex_arg_type(value): + try: + re.compile(value) + except re.error as e: + msg = "must be a valid regex." + if hasattr(e, "msg"): # pragma: no cover + # not available on py27 + msg = "must be a valid regex. Error: {}".format(e.msg) + raise argparse.ArgumentTypeError(msg) + return value + + def parse_arguments(args): """ Parse all given arguments. @@ -350,7 +405,6 @@ def parse_arguments(args): type=lambda x: _validate_integer_ht_0(x), help="assume IP address is in column n (1-based indexed; default: 1)", ) - parser.set_defaults(column=[1]) parser.add_argument( "-l", "--delimiter", @@ -358,7 +412,13 @@ def parse_arguments(args): type=str, help='log delimiter (default: " ")', ) - parser.set_defaults(delimiter=" ") + parser.add_argument( + "--regex", + metavar="STRING", + nargs="+", + help="regex for detecting IP addresses (use optionally instead of -c)", + type=regex_arg_type, + ) parser.add_argument( "-r", "--replace", @@ -380,6 +440,20 @@ def parse_arguments(args): args = parser.parse_args(args) + if args.regex and (args.columns is not None or args.delimiter is not None): + raise parser.error( + 'Ambiguous arguments: When using "--regex", "-c" and "-l" can\'t be used.' + ) + if not args.regex and args.columns is None: + args.columns = [1] + if not args.regex and args.delimiter is None: + args.delimiter = " " + if args.regex: + try: + args.regex = re.compile(r"|".join(args.regex)) + except re.error: # pragma: no cover + raise argparse.ArgumentTypeError("Failed to compile concatenated regex!") + return args @@ -402,6 +476,7 @@ def main(): args.increment, args.delimiter, args.replace, + args.regex, args.skip_private, ) diff --git a/tests.py b/tests.py index 9d16508..cb07d6a 100755 --- a/tests.py +++ b/tests.py @@ -10,6 +10,7 @@ import argparse import logging +import re import sys from io import StringIO @@ -128,6 +129,54 @@ def test_column(line, columns, expected): assert a.process_line(line) == expected +@pytest.mark.parametrize( + "line,regex,expected,replace", + [ + ( + '3.3.3.3 - - [20/May/2015:21:05:01 +0000] "GET / HTTP/1.1" 200 13358 "-" "useragent"', + re.compile( + r"(?:^([^,]+) - - |.* - somefixedstring: ([^,]+) - .* - ([^,]+))" + ), + '3.3.0.0 - - [20/May/2015:21:05:01 +0000] "GET / HTTP/1.1" 200 13358 "-" "useragent"', + None, + ), + ( + "blabla/ 3.3.3.3 /blublu", + re.compile(r"^blabla/ ([^,]+) /blublu"), + "blabla/ 3.3.0.0 /blublu", + None, + ), + ( + "1.1.1.1 - somefixedstring: 2.2.2.2 - some random stuff - 3.3.3.3", + re.compile(r"^([^,]+) - somefixedstring: ([^,]+) - .* - ([^,]+)"), + "1.1.0.0 - somefixedstring: 2.2.0.0 - some random stuff - 3.3.0.0", + None, + ), + ( + "some line that doesn't match the provided regex", + re.compile(r"^([^,]+) - somefixedstring: ([^,]+) - .* - ([^,]+)"), + "some line that doesn't match the provided regex", + None, + ), + ( + "match but no ip/ notanip /blublu", + re.compile(r"^match but no ip/ ([^,]+) /blublu"), + "match but no ip/ notanip /blublu", + None, + ), + ( + "match but no ip/ notanip /blublu", + re.compile(r"^match but no ip/ ([^,]+) /blublu"), + "match but no ip/ yeah /blublu", + "yeah", + ), + ], +) +def test_regex(line, regex, expected, replace): + a = anonip.Anonip(regex=regex, replace=replace) + assert a.process_line(line) == expected + + def test_replace(): a = anonip.Anonip(replace="replacement") assert a.process_line("bla something") == "replacement something" @@ -178,6 +227,39 @@ def test_cli_generic_args(args, attribute, expected): assert getattr(anonip.parse_arguments(args), attribute) == expected +@pytest.mark.parametrize( + "args,success", + [ + ([], True), + (["--regex", "test"], True), + (["-c", "4"], True), + (["--regex", "test", "-c", "3"], False), + (["--regex", "test", "-l", ";"], False), + (["--regex", "test", "-l", ";", "-c", "4"], False), + ], +) +def test_cli_args_ambiguity(args, success): + if success: + anonip.parse_arguments(args) + return + + with pytest.raises(SystemExit) as e: + anonip.parse_arguments(args) + assert e.value.code == 2 + + +@pytest.mark.parametrize( + "args,expected", + [ + (["--regex", "test"], "test"), + (["--regex", "foo", "bar", "baz"], "foo|bar|baz"), + ], +) +def test_regex_concat(args, expected): + args = anonip.parse_arguments(args) + assert args.regex == re.compile(expected) + + @pytest.mark.parametrize( "value,valid,bits", [ @@ -207,6 +289,15 @@ def test_cli_validate_integer_ht_0(value, valid): anonip._validate_integer_ht_0(value) +@pytest.mark.parametrize("value,valid", [("valid (.*)", True), ("\\9", False)]) +def test_regex_arg_type(value, valid): + if valid: + assert anonip.regex_arg_type(value) == value + else: + with pytest.raises(argparse.ArgumentTypeError): + anonip.regex_arg_type(value) + + @pytest.mark.parametrize("to_file", [False, True]) @pytest.mark.parametrize("debug,log_level", [(False, 30), (True, 10)]) def test_main(