forked from google/vanir
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvulnerability.py
204 lines (168 loc) · 7.08 KB
/
vulnerability.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# Copyright 2023 Google LLC
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
"""Manage OSV Vulnerabilities and their related data structures."""
from collections.abc import MutableMapping, MutableSequence
import copy
import dataclasses
import enum
import functools
import json
import re
from typing import Any, Dict, Mapping, Optional, Sequence, Union
from google.protobuf import json_format
from osv import vulnerability_pb2
from vanir import signature
OSV_AFFECTED = 'affected'
OSV_PACKAGE = 'package'
OSV_PACKAGE_NAME = 'name'
OSV_ECOSYSTEM = 'ecosystem'
OSV_VERSIONS = 'versions'
OSV_ECOSYSTEM_SPECIFIC = 'ecosystem_specific'
OSV_VANIR_SIGNATURES = 'vanir_signatures'
OSV_SIGNATURE_ID = 'id'
@enum.unique
class MetaPackage(enum.Enum):
"""Special set of OSV packages.
An actual package name of a metapacakge can contain postfix strings.
E.g., ":linux_kernel:", ":linux_kernel:Qualcomm", ":linux_kernel:Mediatek".
"""
UNKNOWN = ':unknown:'
ANDROID_KERNEL = ':linux_kernel:'
ANDROID_MODEM = ':modem:'
@functools.cached_property
def package_pattern(self) -> re.Pattern[str]:
"""Returns regex pattern of packages belonging to the meta-package."""
pattern = re.escape(self.value) + '.*'
return re.compile(pattern)
class AffectedEntry:
"""Class for representing an affected package entry in OSV format."""
def __init__(self, osv_affected: Mapping[str, Any]):
if (
OSV_PACKAGE not in osv_affected
or OSV_ECOSYSTEM not in osv_affected[OSV_PACKAGE]
or not osv_affected[OSV_PACKAGE][OSV_ECOSYSTEM]
or OSV_PACKAGE_NAME not in osv_affected[OSV_PACKAGE]
or not osv_affected[OSV_PACKAGE][OSV_PACKAGE_NAME]
):
raise ValueError(f'Missing package info: {osv_affected}')
self._osv_affected: Dict[str, Any] = dict(copy.deepcopy(osv_affected))
if OSV_ECOSYSTEM_SPECIFIC not in self._osv_affected:
self._osv_affected[OSV_ECOSYSTEM_SPECIFIC] = {}
if OSV_VANIR_SIGNATURES not in self._osv_affected[OSV_ECOSYSTEM_SPECIFIC]:
self._osv_affected[OSV_ECOSYSTEM_SPECIFIC][OSV_VANIR_SIGNATURES] = []
@property
def ecosystem(self) -> str:
"""Returns the ecosystem of the affected package."""
return self._osv_affected[OSV_PACKAGE][OSV_ECOSYSTEM]
@ecosystem.setter
def ecosystem(self, value: str):
self._osv_affected[OSV_PACKAGE][OSV_ECOSYSTEM] = value
@property
def osv_package_name(self) -> str:
"""Returns the raw OSV package name of the affected package."""
return self._osv_affected[OSV_PACKAGE][OSV_PACKAGE_NAME]
@property
def package_name(self) -> str:
"""Returns the normalized package name of the affected package.
Usually, an OSV package name is the package name we want to use. However,
there are special packages, e.g. ":linux_kernel:Qualcomm" or
":linux_kernel:Mediatek", which we normalize to a MetaPackage
e.g. ":linux_kernel:".
"""
for meta_package in MetaPackage:
if self.osv_package_name.startswith(meta_package.value):
return meta_package.value
return self.osv_package_name
@osv_package_name.setter
def osv_package_name(self, value: str):
self._osv_affected[OSV_PACKAGE][OSV_PACKAGE_NAME] = value
@property
def versions(self) -> Sequence[str]:
"""Returns the list of affected versions of this affected entry."""
return self._osv_affected.get(OSV_VERSIONS, [])
@property
def ecosystem_specific(self) -> MutableMapping[str, Any]:
return self._osv_affected[OSV_ECOSYSTEM_SPECIFIC]
@property
def vanir_signatures(self) -> MutableSequence[signature.Signature]:
return self.ecosystem_specific[OSV_VANIR_SIGNATURES]
@vanir_signatures.setter
def vanir_signatures(self, value: Sequence[signature.Signature]):
self.ecosystem_specific[OSV_VANIR_SIGNATURES] = value
def sort_vanir_signatures(self):
self.ecosystem_specific[OSV_VANIR_SIGNATURES].sort(
key=lambda sig: (
sig.signature_id if isinstance(sig, signature.Signature) else
sig[OSV_SIGNATURE_ID],
)
)
def to_osv_dict(self, use_string_hashes: bool = False) -> Mapping[str, Any]:
"""Returns the affected package as a dict in OSV format."""
osv_affected = copy.deepcopy(self._osv_affected)
if self.vanir_signatures:
osv_affected[OSV_ECOSYSTEM_SPECIFIC][OSV_VANIR_SIGNATURES] = [
sig.to_osv_dict(use_string_hashes)
if isinstance(sig, signature.Signature) else sig
for sig in self.vanir_signatures
]
else:
del osv_affected[OSV_ECOSYSTEM_SPECIFIC][OSV_VANIR_SIGNATURES]
return osv_affected
@dataclasses.dataclass
class Vulnerability:
"""Class for representing a vulnerability entry in OSV format.
Fields definitions: https://ossf.github.io/osv-schema/
"""
schema_version: Optional[str] = None
id: str = ''
modified: str = ''
published: Optional[str] = None
aliases: Optional[Sequence[str]] = None
related: Optional[Sequence[str]] = None
summary: Optional[str] = None
details: Optional[str] = None
references: Optional[Sequence[Mapping[str, str]]] = None
severity: Optional[Mapping[str, str]] = None
affected: Sequence[AffectedEntry] = ()
credits: Optional[Sequence[Mapping[str, str]]] = None
database_specific: Optional[Mapping[str, Any]] = None
def __init__(self, osv_vuln: Mapping[str, Any]):
if not isinstance(osv_vuln, Mapping):
raise ValueError(f'Not a Mapping: {osv_vuln}')
for field in dataclasses.fields(self):
if field.name in osv_vuln and field.name != OSV_AFFECTED:
setattr(self, field.name, copy.deepcopy(osv_vuln[field.name]))
# Convert all affected entries to AffectedEntry objects.
self.affected = []
for osv_affected in osv_vuln.get(OSV_AFFECTED, []):
self.affected.append(AffectedEntry(osv_affected))
if not self.id or not self.modified or not self.affected:
raise ValueError('Missing required fields: id, modified, or affected')
def to_osv_dict(self, use_string_hashes: bool = False) -> Mapping[str, Any]:
"""Returns the vulnerability as a dict in OSV format."""
osv_vuln = dataclasses.asdict(self)
if self.affected:
affected_entries = []
for affected in osv_vuln[OSV_AFFECTED]:
# At this point, affected field should be a list of AffectedEntry due
# to __post_init__(). This assert is needed to make pytype happy.
assert isinstance(affected, AffectedEntry)
affected_entries.append(affected.to_osv_dict(use_string_hashes))
osv_vuln[OSV_AFFECTED] = affected_entries
empty_entries = {key for key in osv_vuln if not osv_vuln[key]}
for key in empty_entries:
del osv_vuln[key]
return osv_vuln
def to_proto(self) -> vulnerability_pb2.Vulnerability:
"""Converts this Vulnerability object to OSV vulnerability proto."""
def _to_proto_dict(
item: Union[Vulnerability, AffectedEntry, signature.Signature],
) -> Mapping[str, Any]:
return item.to_osv_dict(use_string_hashes=True)
return json_format.Parse(
json.dumps(self, default=_to_proto_dict),
vulnerability_pb2.Vulnerability(),
)