Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend MMDB with max_country_info_qt #697

Merged
merged 2 commits into from
Oct 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 32 additions & 34 deletions misp_modules/modules/expansion/mmdb_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,22 @@

misperrors = {'error': 'Error'}
mispattributes = {'input': ['ip-src', 'ip-src|port', 'ip-dst', 'ip-dst|port'], 'format': 'misp_standard'}
moduleinfo = {
'version': '1',
'author': 'Jeroen Pinoy',
'description': "A hover and expansion module to enrich an ip with geolocation and ASN information from an mmdb server instance, such as CIRCL's ip.circl.lu.",
'module-type': ['expansion', 'hover'],
'name': 'GeoIP Enrichment',
'logo': 'circl.png',
'requirements': [],
'features': 'The module takes an IP address related attribute as input.\n It queries the public CIRCL.lu mmdb-server instance, available at ip.circl.lu, by default. The module can be configured with a custom mmdb server url if required.\n It is also possible to filter results on 1 db_source by configuring db_source_filter.',
'references': ['https://data.public.lu/fr/datasets/geo-open-ip-address-geolocation-per-country-in-mmdb-format/', 'https://github.com/adulau/mmdb-server'],
'input': 'An IP address attribute (for example ip-src or ip-src|port).',
'output': 'Geolocation and asn objects.',
}
moduleconfig = ["custom_API", "db_source_filter"]
moduleinfo = {'version': '1', 'author': 'Jeroen Pinoy',
'description': "An expansion module to enrich an ip with geolocation and asn information from an mmdb server "
"such as ip.circl.lu.",
'module-type': ['expansion', 'hover']}
moduleconfig = ["custom_API", "db_source_filter", "max_country_info_qt"]
mmdblookup_url = 'https://ip.circl.lu/'


class MmdbLookupParser():
def __init__(self, attribute, mmdblookupresult, api_url):
def __init__(self, attribute, mmdblookupresult, api_url, max_country_info_qt=0):
self.attribute = attribute
self.mmdblookupresult = mmdblookupresult
self.api_url = api_url
self.misp_event = MISPEvent()
self.misp_event.add_attribute(**attribute)
self.max_country_info_qt = int(max_country_info_qt)

def get_result(self):
event = json.loads(self.misp_event.to_json())
Expand All @@ -37,26 +29,29 @@ def get_result(self):

def parse_mmdblookup_information(self):
# There is a chance some db's have a hit while others don't so we have to check if entry is empty each time
country_info_qt = 0
for result_entry in self.mmdblookupresult:
if result_entry['country_info']:
mmdblookup_object = MISPObject('geolocation')
mmdblookup_object.add_attribute('country',
**{'type': 'text', 'value': result_entry['country_info']['Country']})
mmdblookup_object.add_attribute('countrycode',
**{'type': 'text', 'value': result_entry['country']['iso_code']})
mmdblookup_object.add_attribute('latitude',
**{'type': 'float',
'value': result_entry['country_info']['Latitude (average)']})
mmdblookup_object.add_attribute('longitude',
**{'type': 'float',
'value': result_entry['country_info']['Longitude (average)']})
mmdblookup_object.add_attribute('text',
**{'type': 'text',
'value': 'db_source: {}. build_db: {}. Latitude and longitude are country average.'.format(
result_entry['meta']['db_source'],
result_entry['meta']['build_db'])})
mmdblookup_object.add_reference(self.attribute['uuid'], 'related-to')
self.misp_event.add_object(mmdblookup_object)
if (self.max_country_info_qt == 0) or (self.max_country_info_qt > 0 and country_info_qt < self.max_country_info_qt):
mmdblookup_object = MISPObject('geolocation')
mmdblookup_object.add_attribute('country',
**{'type': 'text', 'value': result_entry['country_info']['Country']})
mmdblookup_object.add_attribute('countrycode',
**{'type': 'text', 'value': result_entry['country']['iso_code']})
mmdblookup_object.add_attribute('latitude',
**{'type': 'float',
'value': result_entry['country_info']['Latitude (average)']})
mmdblookup_object.add_attribute('longitude',
**{'type': 'float',
'value': result_entry['country_info']['Longitude (average)']})
mmdblookup_object.add_attribute('text',
**{'type': 'text',
'value': 'db_source: {}. build_db: {}. Latitude and longitude are country average.'.format(
result_entry['meta']['db_source'],
result_entry['meta']['build_db'])})
mmdblookup_object.add_reference(self.attribute['uuid'], 'related-to')
self.misp_event.add_object(mmdblookup_object)
country_info_qt += 1
if 'AutonomousSystemNumber' in result_entry['country']:
mmdblookup_object_asn = MISPObject('asn')
mmdblookup_object_asn.add_attribute('asn',
Expand Down Expand Up @@ -96,6 +91,9 @@ def handler(q=False):
else:
misperrors['error'] = 'There is no attribute of type ip-src or ip-dst provided as input'
return misperrors
max_country_info_qt = request['config'].get('max_country_info_qt', 0)
if max_country_info_qt is None:
max_country_info_qt = 0
api_url = check_url(request['config']['custom_API']) if 'config' in request and request['config'].get(
'custom_API') else mmdblookup_url
r = requests.get("{}/geolookup/{}".format(api_url, toquery))
Expand Down Expand Up @@ -123,7 +121,7 @@ def handler(q=False):
else:
misperrors['error'] = 'API not accessible - http status code {} was returned'.format(r.status_code)
return misperrors
parser = MmdbLookupParser(attribute, mmdblookupresult, api_url)
parser = MmdbLookupParser(attribute, mmdblookupresult, api_url, max_country_info_qt)
parser.parse_mmdblookup_information()
result = parser.get_result()
return result
Expand Down
Loading