Skip to content

Commit

Permalink
Merge pull request #48 from open-dynaMIX/regex_detection
Browse files Browse the repository at this point in the history
feat(cli): Regex based IP detection
  • Loading branch information
open-dynaMIX authored Dec 26, 2021
2 parents b516ea9 + bf37456 commit 895dacd
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 5 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Using shell redirects, it's also possible to rewrite existing log files.
- Masks IP addresses in log files
- Configurable amount of masked bits
- The column containing the IP address can freely be chosen
- Alternatively use a regex to point anonip to the location(s) of the IP(s). See [this RFC](https://github.com/DigitaleGesellschaft/Anonip/issues/44) for more information.
- Works for both access.log- and error.log files

## Officially supported python versions
Expand All @@ -57,7 +58,7 @@ For python versions <3.3:
```
usage: anonip.py [-h] [-4 INTEGER] [-6 INTEGER] [-i INTEGER] [-o FILE]
[--input FILE] [-c INTEGER [INTEGER ...]] [-l STRING]
[-r STRING] [-p] [-d] [-v]
[--regex STRING [STRING ...]] [-r STRING] [-p] [-d] [-v]
Anonip is a tool to anonymize IP-addresses in log files.
Expand All @@ -77,13 +78,18 @@ optional arguments:
default: 1)
-l STRING, --delimiter STRING
log delimiter (default: " ")
--regex STRING [STRING ...]
regex for detecting IP addresses (use instead of -c)
-r STRING, --replace STRING
replacement string in case address parsing fails
(Example: 0.0.0.0)
-p, --skip-private do not mask addresses in private ranges. See IANA
Special-Purpose Address Registry.
-d, --debug print debug messages
-v, --version show program's version number and exit
Example-usage in apache-config:
CustomLog "| /path/to/anonip.py [OPTIONS] --output /path/to/log" combined
```

## Usage
Expand Down
83 changes: 79 additions & 4 deletions anonip.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import argparse
import logging
import re
import sys
from io import open

Expand Down Expand Up @@ -78,6 +79,7 @@ def __init__(
increment=0,
delimiter=" ",
replace=None,
regex=None,
skip_private=False,
):
"""
Expand All @@ -98,6 +100,7 @@ def __init__(
self.increment = increment
self.delimiter = delimiter
self.replace = replace
self.regex = regex
self.skip_private = skip_private

@property
Expand Down Expand Up @@ -154,6 +157,7 @@ def run(self, input_file=None):
logger.debug("Got line: %r", line)

yield self.process_line(line)

line = input_file.readline()

def process_ip(self, ip):
Expand All @@ -176,9 +180,36 @@ def process_ip(self, ip):
)
return trunc_ip

def process_line(self, line):
def process_line_regex(self, line):
"""
This function processes a single line.
This function processes a single line based on the provided regex.
It returns the anonymized log line as string.
:param line: str
:return: str
"""
match = re.match(self.regex, line)
if not match:
logger.debug("Regex did not match!")
return line
groups = match.groups()

for m in set(groups):
if not m:
continue
ip_str, ip = self.extract_ip(m)
if ip:
trunc_ip = self.process_ip(ip)
line = line.replace(ip_str, str(trunc_ip))
elif self.replace:
line = line.replace(m, self.replace)

return line

def process_line_column(self, line):
"""
This function processes a single line based on the provided columns.
It returns the anonymized log line as string.
Expand Down Expand Up @@ -206,6 +237,18 @@ def process_line(self, line):

return self.delimiter.join(loglist)

def process_line(self, line):
"""
This function processes a single line.
It returns the anonymized log line as string.
:param line: str
:return: str
"""
if self.regex:
return self.process_line_regex(line)
return self.process_line_column(line)

@staticmethod
def extract_ip(column):
"""
Expand Down Expand Up @@ -297,6 +340,18 @@ def _validate_integer_ht_0(value):
return value


def regex_arg_type(value):
try:
re.compile(value)
except re.error as e:
msg = "must be a valid regex."
if hasattr(e, "msg"): # pragma: no cover
# not available on py27
msg = "must be a valid regex. Error: {}".format(e.msg)
raise argparse.ArgumentTypeError(msg)
return value


def parse_arguments(args):
"""
Parse all given arguments.
Expand Down Expand Up @@ -350,15 +405,20 @@ def parse_arguments(args):
type=lambda x: _validate_integer_ht_0(x),
help="assume IP address is in column n (1-based indexed; default: 1)",
)
parser.set_defaults(column=[1])
parser.add_argument(
"-l",
"--delimiter",
metavar="STRING",
type=str,
help='log delimiter (default: " ")',
)
parser.set_defaults(delimiter=" ")
parser.add_argument(
"--regex",
metavar="STRING",
nargs="+",
help="regex for detecting IP addresses (use optionally instead of -c)",
type=regex_arg_type,
)
parser.add_argument(
"-r",
"--replace",
Expand All @@ -380,6 +440,20 @@ def parse_arguments(args):

args = parser.parse_args(args)

if args.regex and (args.columns is not None or args.delimiter is not None):
raise parser.error(
'Ambiguous arguments: When using "--regex", "-c" and "-l" can\'t be used.'
)
if not args.regex and args.columns is None:
args.columns = [1]
if not args.regex and args.delimiter is None:
args.delimiter = " "
if args.regex:
try:
args.regex = re.compile(r"|".join(args.regex))
except re.error: # pragma: no cover
raise argparse.ArgumentTypeError("Failed to compile concatenated regex!")

return args


Expand All @@ -402,6 +476,7 @@ def main():
args.increment,
args.delimiter,
args.replace,
args.regex,
args.skip_private,
)

Expand Down
91 changes: 91 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import argparse
import logging
import re
import sys
from io import StringIO

Expand Down Expand Up @@ -128,6 +129,54 @@ def test_column(line, columns, expected):
assert a.process_line(line) == expected


@pytest.mark.parametrize(
"line,regex,expected,replace",
[
(
'3.3.3.3 - - [20/May/2015:21:05:01 +0000] "GET / HTTP/1.1" 200 13358 "-" "useragent"',
re.compile(
r"(?:^([^,]+) - - |.* - somefixedstring: ([^,]+) - .* - ([^,]+))"
),
'3.3.0.0 - - [20/May/2015:21:05:01 +0000] "GET / HTTP/1.1" 200 13358 "-" "useragent"',
None,
),
(
"blabla/ 3.3.3.3 /blublu",
re.compile(r"^blabla/ ([^,]+) /blublu"),
"blabla/ 3.3.0.0 /blublu",
None,
),
(
"1.1.1.1 - somefixedstring: 2.2.2.2 - some random stuff - 3.3.3.3",
re.compile(r"^([^,]+) - somefixedstring: ([^,]+) - .* - ([^,]+)"),
"1.1.0.0 - somefixedstring: 2.2.0.0 - some random stuff - 3.3.0.0",
None,
),
(
"some line that doesn't match the provided regex",
re.compile(r"^([^,]+) - somefixedstring: ([^,]+) - .* - ([^,]+)"),
"some line that doesn't match the provided regex",
None,
),
(
"match but no ip/ notanip /blublu",
re.compile(r"^match but no ip/ ([^,]+) /blublu"),
"match but no ip/ notanip /blublu",
None,
),
(
"match but no ip/ notanip /blublu",
re.compile(r"^match but no ip/ ([^,]+) /blublu"),
"match but no ip/ yeah /blublu",
"yeah",
),
],
)
def test_regex(line, regex, expected, replace):
a = anonip.Anonip(regex=regex, replace=replace)
assert a.process_line(line) == expected


def test_replace():
a = anonip.Anonip(replace="replacement")
assert a.process_line("bla something") == "replacement something"
Expand Down Expand Up @@ -178,6 +227,39 @@ def test_cli_generic_args(args, attribute, expected):
assert getattr(anonip.parse_arguments(args), attribute) == expected


@pytest.mark.parametrize(
"args,success",
[
([], True),
(["--regex", "test"], True),
(["-c", "4"], True),
(["--regex", "test", "-c", "3"], False),
(["--regex", "test", "-l", ";"], False),
(["--regex", "test", "-l", ";", "-c", "4"], False),
],
)
def test_cli_args_ambiguity(args, success):
if success:
anonip.parse_arguments(args)
return

with pytest.raises(SystemExit) as e:
anonip.parse_arguments(args)
assert e.value.code == 2


@pytest.mark.parametrize(
"args,expected",
[
(["--regex", "test"], "test"),
(["--regex", "foo", "bar", "baz"], "foo|bar|baz"),
],
)
def test_regex_concat(args, expected):
args = anonip.parse_arguments(args)
assert args.regex == re.compile(expected)


@pytest.mark.parametrize(
"value,valid,bits",
[
Expand Down Expand Up @@ -207,6 +289,15 @@ def test_cli_validate_integer_ht_0(value, valid):
anonip._validate_integer_ht_0(value)


@pytest.mark.parametrize("value,valid", [("valid (.*)", True), ("\\9", False)])
def test_regex_arg_type(value, valid):
if valid:
assert anonip.regex_arg_type(value) == value
else:
with pytest.raises(argparse.ArgumentTypeError):
anonip.regex_arg_type(value)


@pytest.mark.parametrize("to_file", [False, True])
@pytest.mark.parametrize("debug,log_level", [(False, 30), (True, 10)])
def test_main(
Expand Down

0 comments on commit 895dacd

Please sign in to comment.