forked from MISP/misp-modules
-
Notifications
You must be signed in to change notification settings - Fork 3
/
cpe.py
133 lines (120 loc) · 4.77 KB
/
cpe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import json
import requests
from . import check_input_attribute, standard_error_message
from pymisp import MISPEvent, MISPObject
misperrors = {'error': 'Error'}
mispattributes = {'input': ['cpe'], 'format': 'misp_standard'}
moduleinfo = {
'version': '2',
'author': 'Christian Studer',
'description': 'An expansion module to enrich a CPE attribute with its related vulnerabilities.',
'module-type': ['expansion', 'hover']
}
moduleconfig = ["custom_API_URL", "limit"]
cveapi_url = 'https://cvepremium.circl.lu/api/query'
DEFAULT_LIMIT = 10
class VulnerabilitiesParser():
def __init__(self, attribute):
self.attribute = attribute
self.misp_event = MISPEvent()
self.misp_event.add_attribute(**attribute)
self.vulnerability_mapping = {
'id': {
'type': 'vulnerability',
'object_relation': 'id'
},
'summary': {
'type': 'text',
'object_relation': 'summary'
},
'vulnerable_configuration': {
'type': 'cpe',
'object_relation': 'vulnerable-configuration'
},
'vulnerable_configuration_cpe_2_2': {
'type': 'cpe',
'object_relation': 'vulnerable-configuration'
},
'Modified': {
'type': 'datetime',
'object_relation': 'modified'
},
'Published': {
'type': 'datetime',
'object_relation': 'published'
},
'references': {
'type': 'link',
'object_relation': 'references'
},
'cvss': {
'type': 'float',
'object_relation': 'cvss-score'
}
}
def parse_vulnerabilities(self, vulnerabilities):
for vulnerability in vulnerabilities:
vulnerability_object = MISPObject('vulnerability')
for feature in ('id', 'summary', 'Modified', 'Published', 'cvss'):
if vulnerability.get(feature):
attribute = {'value': vulnerability[feature]}
attribute.update(self.vulnerability_mapping[feature])
vulnerability_object.add_attribute(**attribute)
if vulnerability.get('Published'):
vulnerability_object.add_attribute(**{
'type': 'text',
'object_relation': 'state',
'value': 'Published'
})
for feature in ('references', 'vulnerable_configuration', 'vulnerable_configuration_cpe_2_2'):
if vulnerability.get(feature):
for value in vulnerability[feature]:
if isinstance(value, dict):
value = value['title']
attribute = {'value': value}
attribute.update(self.vulnerability_mapping[feature])
vulnerability_object.add_attribute(**attribute)
vulnerability_object.add_reference(self.attribute['uuid'], 'related-to')
self.misp_event.add_object(vulnerability_object)
def get_result(self):
event = json.loads(self.misp_event.to_json())
results = {key: event[key] for key in ('Attribute', 'Object')}
return {'results': results}
def check_url(url):
return url if url.endswith('/') else f"{url}/"
def handler(q=False):
if q is False:
return False
request = json.loads(q)
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
attribute = request['attribute']
if attribute.get('type') != 'cpe':
return {'error': 'Wrong input attribute type.'}
config = request['config']
url = check_url(config['custom_API_URL']) if config.get('custom_API_URL') else cveapi_url
limit = int(config['limit']) if config.get('limit') else DEFAULT_LIMIT
params = {
"retrieve": "cves",
"dict_filter": {
"vulnerable_configuration": attribute['value']
},
"limit": limit,
"sort": "cvss",
"sort_dir": "DESC"
}
response = requests.post(url, json=params)
if response.status_code == 200:
vulnerabilities = response.json()['data']
if not vulnerabilities:
return {'error': 'No related vulnerability for this CPE.'}
else:
return {'error': 'API not accessible.'}
parser = VulnerabilitiesParser(attribute)
parser.parse_vulnerabilities(vulnerabilities)
return parser.get_result()
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo