From 4c6a215802ec5bac6c7569e283b2eeb7d0f667cf Mon Sep 17 00:00:00 2001 From: Koen Van Impe Date: Fri, 18 Oct 2024 22:00:46 +0200 Subject: [PATCH 1/2] Extend MMDB with max_country_qt When querying MMDB there are sometimes multiple country_info objects returned, mostly due to the different db_source. Sometimes customers are not interested in the db_source, and only the geo-info. This change adds max_country_qt. When - Set to None or 0, has no effect - Set to a value higher than 0, the number of country_info entries is limited to max_country_qt --- misp_modules/modules/expansion/mmdb_lookup.py | 66 +++++++++---------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/misp_modules/modules/expansion/mmdb_lookup.py b/misp_modules/modules/expansion/mmdb_lookup.py index 21848563..a191dfb5 100644 --- a/misp_modules/modules/expansion/mmdb_lookup.py +++ b/misp_modules/modules/expansion/mmdb_lookup.py @@ -5,30 +5,22 @@ misperrors = {'error': 'Error'} mispattributes = {'input': ['ip-src', 'ip-src|port', 'ip-dst', 'ip-dst|port'], 'format': 'misp_standard'} -moduleinfo = { - 'version': '1', - 'author': 'Jeroen Pinoy', - 'description': "A hover and expansion module to enrich an ip with geolocation and ASN information from an mmdb server instance, such as CIRCL's ip.circl.lu.", - 'module-type': ['expansion', 'hover'], - 'name': 'GeoIP Enrichment', - 'logo': 'circl.png', - 'requirements': [], - 'features': 'The module takes an IP address related attribute as input.\n It queries the public CIRCL.lu mmdb-server instance, available at ip.circl.lu, by default. The module can be configured with a custom mmdb server url if required.\n It is also possible to filter results on 1 db_source by configuring db_source_filter.', - 'references': ['https://data.public.lu/fr/datasets/geo-open-ip-address-geolocation-per-country-in-mmdb-format/', 'https://github.com/adulau/mmdb-server'], - 'input': 'An IP address attribute (for example ip-src or ip-src|port).', - 'output': 'Geolocation and asn objects.', -} -moduleconfig = ["custom_API", "db_source_filter"] +moduleinfo = {'version': '1', 'author': 'Jeroen Pinoy', + 'description': "An expansion module to enrich an ip with geolocation and asn information from an mmdb server " + "such as ip.circl.lu.", + 'module-type': ['expansion', 'hover']} +moduleconfig = ["custom_API", "db_source_filter", "max_country_qt"] mmdblookup_url = 'https://ip.circl.lu/' class MmdbLookupParser(): - def __init__(self, attribute, mmdblookupresult, api_url): + def __init__(self, attribute, mmdblookupresult, api_url, max_country_qt=0): self.attribute = attribute self.mmdblookupresult = mmdblookupresult self.api_url = api_url self.misp_event = MISPEvent() self.misp_event.add_attribute(**attribute) + self.max_country_qt = int(max_country_qt) def get_result(self): event = json.loads(self.misp_event.to_json()) @@ -37,26 +29,29 @@ def get_result(self): def parse_mmdblookup_information(self): # There is a chance some db's have a hit while others don't so we have to check if entry is empty each time + country_info_qt = 0 for result_entry in self.mmdblookupresult: if result_entry['country_info']: - mmdblookup_object = MISPObject('geolocation') - mmdblookup_object.add_attribute('country', - **{'type': 'text', 'value': result_entry['country_info']['Country']}) - mmdblookup_object.add_attribute('countrycode', - **{'type': 'text', 'value': result_entry['country']['iso_code']}) - mmdblookup_object.add_attribute('latitude', - **{'type': 'float', - 'value': result_entry['country_info']['Latitude (average)']}) - mmdblookup_object.add_attribute('longitude', - **{'type': 'float', - 'value': result_entry['country_info']['Longitude (average)']}) - mmdblookup_object.add_attribute('text', - **{'type': 'text', - 'value': 'db_source: {}. build_db: {}. Latitude and longitude are country average.'.format( - result_entry['meta']['db_source'], - result_entry['meta']['build_db'])}) - mmdblookup_object.add_reference(self.attribute['uuid'], 'related-to') - self.misp_event.add_object(mmdblookup_object) + if (self.max_country_qt == 0) or (self.max_country_qt > 0 and country_info_qt < self.max_country_qt): + mmdblookup_object = MISPObject('geolocation') + mmdblookup_object.add_attribute('country', + **{'type': 'text', 'value': result_entry['country_info']['Country']}) + mmdblookup_object.add_attribute('countrycode', + **{'type': 'text', 'value': result_entry['country']['iso_code']}) + mmdblookup_object.add_attribute('latitude', + **{'type': 'float', + 'value': result_entry['country_info']['Latitude (average)']}) + mmdblookup_object.add_attribute('longitude', + **{'type': 'float', + 'value': result_entry['country_info']['Longitude (average)']}) + mmdblookup_object.add_attribute('text', + **{'type': 'text', + 'value': 'db_source: {}. build_db: {}. Latitude and longitude are country average.'.format( + result_entry['meta']['db_source'], + result_entry['meta']['build_db'])}) + mmdblookup_object.add_reference(self.attribute['uuid'], 'related-to') + self.misp_event.add_object(mmdblookup_object) + country_info_qt += 1 if 'AutonomousSystemNumber' in result_entry['country']: mmdblookup_object_asn = MISPObject('asn') mmdblookup_object_asn.add_attribute('asn', @@ -96,6 +91,9 @@ def handler(q=False): else: misperrors['error'] = 'There is no attribute of type ip-src or ip-dst provided as input' return misperrors + max_country_qt = request['config'].get('max_country_qt', 0) + if max_country_qt is None: + max_country_qt = 0 api_url = check_url(request['config']['custom_API']) if 'config' in request and request['config'].get( 'custom_API') else mmdblookup_url r = requests.get("{}/geolookup/{}".format(api_url, toquery)) @@ -123,7 +121,7 @@ def handler(q=False): else: misperrors['error'] = 'API not accessible - http status code {} was returned'.format(r.status_code) return misperrors - parser = MmdbLookupParser(attribute, mmdblookupresult, api_url) + parser = MmdbLookupParser(attribute, mmdblookupresult, api_url, max_country_qt) parser.parse_mmdblookup_information() result = parser.get_result() return result From fe2f2acd422c107ad9c1d9588d498a141c88fc0f Mon Sep 17 00:00:00 2001 From: Koen Van Impe Date: Fri, 18 Oct 2024 22:04:38 +0200 Subject: [PATCH 2/2] Be more consistent with max_country_qt / max_country_info_qt --- misp_modules/modules/expansion/mmdb_lookup.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/misp_modules/modules/expansion/mmdb_lookup.py b/misp_modules/modules/expansion/mmdb_lookup.py index a191dfb5..9b25a960 100644 --- a/misp_modules/modules/expansion/mmdb_lookup.py +++ b/misp_modules/modules/expansion/mmdb_lookup.py @@ -9,18 +9,18 @@ 'description': "An expansion module to enrich an ip with geolocation and asn information from an mmdb server " "such as ip.circl.lu.", 'module-type': ['expansion', 'hover']} -moduleconfig = ["custom_API", "db_source_filter", "max_country_qt"] +moduleconfig = ["custom_API", "db_source_filter", "max_country_info_qt"] mmdblookup_url = 'https://ip.circl.lu/' class MmdbLookupParser(): - def __init__(self, attribute, mmdblookupresult, api_url, max_country_qt=0): + def __init__(self, attribute, mmdblookupresult, api_url, max_country_info_qt=0): self.attribute = attribute self.mmdblookupresult = mmdblookupresult self.api_url = api_url self.misp_event = MISPEvent() self.misp_event.add_attribute(**attribute) - self.max_country_qt = int(max_country_qt) + self.max_country_info_qt = int(max_country_info_qt) def get_result(self): event = json.loads(self.misp_event.to_json()) @@ -32,7 +32,7 @@ def parse_mmdblookup_information(self): country_info_qt = 0 for result_entry in self.mmdblookupresult: if result_entry['country_info']: - if (self.max_country_qt == 0) or (self.max_country_qt > 0 and country_info_qt < self.max_country_qt): + if (self.max_country_info_qt == 0) or (self.max_country_info_qt > 0 and country_info_qt < self.max_country_info_qt): mmdblookup_object = MISPObject('geolocation') mmdblookup_object.add_attribute('country', **{'type': 'text', 'value': result_entry['country_info']['Country']}) @@ -91,9 +91,9 @@ def handler(q=False): else: misperrors['error'] = 'There is no attribute of type ip-src or ip-dst provided as input' return misperrors - max_country_qt = request['config'].get('max_country_qt', 0) - if max_country_qt is None: - max_country_qt = 0 + max_country_info_qt = request['config'].get('max_country_info_qt', 0) + if max_country_info_qt is None: + max_country_info_qt = 0 api_url = check_url(request['config']['custom_API']) if 'config' in request and request['config'].get( 'custom_API') else mmdblookup_url r = requests.get("{}/geolookup/{}".format(api_url, toquery)) @@ -121,7 +121,7 @@ def handler(q=False): else: misperrors['error'] = 'API not accessible - http status code {} was returned'.format(r.status_code) return misperrors - parser = MmdbLookupParser(attribute, mmdblookupresult, api_url, max_country_qt) + parser = MmdbLookupParser(attribute, mmdblookupresult, api_url, max_country_info_qt) parser.parse_mmdblookup_information() result = parser.get_result() return result