Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(tags): convert tags to a dictionary #4598

Merged
47 changes: 7 additions & 40 deletions prowler/lib/outputs/asff/asff.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class ASFF(Output):
- transform(findings: list[Finding]) -> None: Transforms a list of findings into ASFF format.
- batch_write_data_to_file() -> None: Writes the findings data to a file in JSON ASFF format.
- generate_status(status: str, muted: bool = False) -> str: Generates the ASFF status based on the provided status and muted flag.
- format_resource_tags(tags: str) -> dict: Transforms a string of tags into a dictionary format.

References:
- AWS Security Hub API Reference: https://docs.aws.amazon.com/securityhub/1.0/APIReference/API_Compliance.html
Expand Down Expand Up @@ -62,15 +61,13 @@ def transform(self, findings: list[Finding]) -> None:
if finding.status == "MANUAL":
continue
timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
resource_tags = ASFF.format_resource_tags(finding.resource_tags)

associated_standards, compliance_summary = ASFF.format_compliance(
finding.compliance
)

# Ensures finding_status matches allowed values in ASFF
finding_status = ASFF.generate_status(finding.status, finding.muted)

self._data.append(
AWSSecurityFindingFormat(
# The following line cannot be changed because it is the format we use to generate unique findings for AWS Security Hub
Expand Down Expand Up @@ -99,7 +96,7 @@ def transform(self, findings: list[Finding]) -> None:
Type=finding.resource_type,
Partition=finding.partition,
Region=finding.region,
Tags=resource_tags,
Tags=finding.resource_tags,
)
],
Compliance=Compliance(
Expand Down Expand Up @@ -195,42 +192,6 @@ def generate_status(status: str, muted: bool = False) -> str:

return json_asff_status

@staticmethod
def format_resource_tags(tags: str) -> dict:
"""
Transforms a string of tags into a dictionary format.

Parameters:
- tags (str): A string containing tags separated by ' | ' and key-value pairs separated by '='.

Returns:
- dict: A dictionary where keys are tag names and values are tag values.

Notes:
- If the input string is empty or None, it returns None.
- Each tag in the input string should be in the format 'key=value'.
- If the input string is not formatted correctly, it logs an error and returns None.
"""
try:
tags_dict = None
if tags:
tags = tags.split(" | ")
tags_dict = {}
for tag in tags:
value = tag.split("=")
tags_dict[value[0]] = value[1]
return tags_dict
except IndexError as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return None
except AttributeError as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return None

@staticmethod
def format_compliance(compliance: dict) -> tuple[list[dict], list[str]]:
"""
Expand Down Expand Up @@ -316,6 +277,12 @@ class Resource(BaseModel):
Region: str
Tags: Optional[dict]

@validator("Tags", pre=True, always=True)
def tags_cannot_be_empty_dict(tags):
if not tags:
return None
return tags


class Compliance(BaseModel):
"""
Expand Down
11 changes: 8 additions & 3 deletions prowler/lib/outputs/csv/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from prowler.lib.logger import logger
from prowler.lib.outputs.finding import Finding
from prowler.lib.outputs.output import Output
from prowler.lib.outputs.utils import unroll_dict, unroll_list
from prowler.lib.outputs.utils import unroll_dict


class CSV(Output):
Expand All @@ -17,8 +17,13 @@ def transform(self, findings: list[Finding]) -> None:
try:
for finding in findings:
finding_dict = {k.upper(): v for k, v in finding.dict().items()}
finding_dict["COMPLIANCE"] = unroll_dict(finding.compliance)
finding_dict["ACCOUNT_TAGS"] = unroll_list(finding.account_tags)
finding_dict["RESOURCE_TAGS"] = unroll_dict(finding.resource_tags)
finding_dict["COMPLIANCE"] = unroll_dict(
finding.compliance, separator=": "
)
finding_dict["ACCOUNT_TAGS"] = unroll_dict(
finding.account_tags, separator=":"
)
finding_dict["STATUS"] = finding.status.value
finding_dict["SEVERITY"] = finding.severity.value
self._data.append(finding_dict)
Expand Down
4 changes: 2 additions & 2 deletions prowler/lib/outputs/finding.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Finding(BaseModel):
# Optional since it depends on permissions
account_organization_name: Optional[str]
# Optional since it depends on permissions
account_tags: Optional[list[str]]
account_tags: dict = {}
finding_uid: str
provider: str
check_id: str
Expand All @@ -66,7 +66,7 @@ class Finding(BaseModel):
resource_uid: str
resource_name: str
resource_details: str
resource_tags: str
resource_tags: dict = {}
# Only present for AWS and Azure
partition: Optional[str]
region: str
Expand Down
4 changes: 2 additions & 2 deletions prowler/lib/outputs/html/html.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ def transform(self, findings: list[Finding]) -> None:
<td>{finding.check_id.replace("_", "<wbr />_")}</td>
<td>{finding.check_title}</td>
<td>{finding.resource_uid.replace("<", "&lt;").replace(">", "&gt;").replace("_", "<wbr />_")}</td>
<td>{parse_html_string(finding.resource_tags)}</td>
<td>{parse_html_string(unroll_dict(finding.resource_tags))}</td>
<td>{finding.status_extended.replace("<", "&lt;").replace(">", "&gt;").replace("_", "<wbr />_")}</td>
<td><p class="show-read-more">{html.escape(finding.risk)}</p></td>
<td><p class="show-read-more">{html.escape(finding.remediation_recommendation_text)}</p> <a class="read-more" href="{finding.remediation_recommendation_url}"><i class="fas fa-external-link-alt"></i></a></td>
<td><p class="show-read-more">{parse_html_string(unroll_dict(finding.compliance))}</p></td>
<td><p class="show-read-more">{parse_html_string(unroll_dict(finding.compliance, separator=": "))}</p></td>
</tr>
"""
)
Expand Down
10 changes: 3 additions & 7 deletions prowler/lib/outputs/ocsf/ocsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from prowler.lib.logger import logger
from prowler.lib.outputs.finding import Finding
from prowler.lib.outputs.output import Output
from prowler.lib.outputs.utils import unroll_dict_to_list


class OCSF(Output):
Expand Down Expand Up @@ -97,12 +98,7 @@ def transform(self, findings: List[Finding]) -> None:
risk_details=finding.risk,
resources=[
ResourceDetails(
# TODO: Check labels for other providers
labels=(
finding.resource_tags.split(",")
if finding.resource_tags
else []
),
labels=unroll_dict_to_list(finding.resource_tags),
name=finding.resource_name,
uid=finding.resource_uid,
group=Group(name=finding.service_name),
Expand Down Expand Up @@ -148,7 +144,7 @@ def transform(self, findings: List[Finding]) -> None:
type_id=cloud_account_type.value,
type=cloud_account_type.name,
uid=finding.account_uid,
labels=finding.account_tags,
labels=unroll_dict_to_list(finding.account_tags),
),
org=Organization(
uid=finding.account_organization_uid,
Expand Down
170 changes: 127 additions & 43 deletions prowler/lib/outputs/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,24 @@
def unroll_list(listed_items: list, separator: str = "|"):
def unroll_list(listed_items: list, separator: str = "|") -> str:
"""
Unrolls a list of items into a single string, separated by a specified separator.

Args:
listed_items (list): The list of items to be unrolled.
separator (str, optional): The separator to be used between the items. Defaults to "|".

Returns:
str: The unrolled string.

Examples:
>>> unroll_list(['apple', 'banana', 'orange'])
'apple | banana | orange'

>>> unroll_list(['apple', 'banana', 'orange'], separator=',')
'apple, banana, orange'

>>> unroll_list([])
''
"""
unrolled_items = ""
if listed_items:
for item in listed_items:
Expand All @@ -13,70 +33,118 @@ def unroll_list(listed_items: list, separator: str = "|"):
return unrolled_items


def unroll_tags(tags: list):
unrolled_items = ""
separator = "|"
def unroll_tags(tags: list) -> dict:
"""
Unrolls a list of tags into a dictionary.

Args:
tags (list): A list of tags.

Returns:
dict: A dictionary containing the unrolled tags.

Examples:
>>> tags = [{"key": "name", "value": "John"}, {"key": "age", "value": "30"}]
>>> unroll_tags(tags)
{'name': 'John', 'age': '30'}

>>> tags = [{"Key": "name", "Value": "John"}, {"Key": "age", "Value": "30"}]
>>> unroll_tags(tags)
{'name': 'John', 'age': '30'}

>>> tags = [{"name": "John", "age": "30"}]
>>> unroll_tags(tags)
{'name': 'John', 'age': '30'}

>>> tags = []
>>> unroll_tags(tags)
{}
"""
if tags and tags != [{}] and tags != [None]:
for item in tags:
# Check if there are tags in list
if isinstance(item, dict):
for key, value in item.items():
if not unrolled_items:
# Check the pattern of tags (Key:Value or Key:key/Value:value)
if "Key" != key and "Value" != key:
unrolled_items = f"{key}={value}"
else:
if "Key" == key:
unrolled_items = f"{value}="
else:
unrolled_items = f"{value}"
else:
if "Key" != key and "Value" != key:
unrolled_items = (
f"{unrolled_items} {separator} {key}={value}"
)
else:
if "Key" == key:
unrolled_items = (
f"{unrolled_items} {separator} {value}="
)
else:
unrolled_items = f"{unrolled_items}{value}"
elif not unrolled_items:
unrolled_items = f"{item}"
else:
unrolled_items = f"{unrolled_items} {separator} {item}"
if "key" in tags[0]:
return {item["key"]: item["value"] for item in tags}
elif "Key" in tags[0]:
return {item["Key"]: item["Value"] for item in tags}
else:
return {key: value for d in tags for key, value in d.items()}
return {}

return unrolled_items

def unroll_dict(dict: dict, separator: str = "=") -> str:
"""
Unrolls a dictionary into a string representation.

Args:
dict (dict): The dictionary to be unrolled.

Returns:
str: The unrolled string representation of the dictionary.

Examples:
>>> my_dict = {'name': 'John', 'age': 30, 'hobbies': ['reading', 'coding']}
>>> unroll_dict(my_dict)
'name: John | age: 30 | hobbies: reading, coding'
"""

def unroll_dict(dict: dict):
unrolled_items = ""
separator = "|"
for key, value in dict.items():
if isinstance(value, list):
value = ", ".join(value)
if not unrolled_items:
unrolled_items = f"{key}: {value}"
unrolled_items = f"{key}{separator}{value}"
else:
unrolled_items = f"{unrolled_items} {separator} {key}: {value}"
unrolled_items = f"{unrolled_items} | {key}{separator}{value}"

return unrolled_items


def unroll_dict_to_list(dict: dict):
def unroll_dict_to_list(dict: dict) -> list:
"""
Unrolls a dictionary into a list of key-value pairs.

Args:
dict (dict): The dictionary to be unrolled.

Returns:
list: A list of key-value pairs, where each pair is represented as a string.

Examples:
>>> my_dict = {'name': 'John', 'age': 30, 'hobbies': ['reading', 'coding']}
>>> unroll_dict_to_list(my_dict)
['name: John', 'age: 30', 'hobbies: reading, coding']
"""

dict_list = []
for key, value in dict.items():
if isinstance(value, list):
value = ", ".join(value)
dict_list.append(f"{key}: {value}")
dict_list.append(f"{key}:{value}")
else:
dict_list.append(f"{key}: {value}")
dict_list.append(f"{key}:{value}")

return dict_list


def parse_json_tags(tags: list):
def parse_json_tags(tags: list) -> dict[str, str]:
"""
Parses a list of JSON tags and returns a dictionary of key-value pairs.

Args:
tags (list): A list of JSON tags.

Returns:
dict: A dictionary containing the parsed key-value pairs from the tags.

Examples:
>>> tags = [
... {"Key": "Name", "Value": "John"},
... {"Key": "Age", "Value": "30"},
... {"Key": "City", "Value": "New York"}
... ]
>>> parse_json_tags(tags)
{'Name': 'John', 'Age': '30', 'City': 'New York'}
"""

dict_tags = {}
if tags and tags != [{}] and tags != [None]:
for tag in tags:
Expand All @@ -88,7 +156,23 @@ def parse_json_tags(tags: list):
return dict_tags


def parse_html_string(str: str):
def parse_html_string(str: str) -> str:
"""
Parses a string and returns a formatted HTML string.

This function takes an input string and splits it using the delimiter " | ".
It then formats each element of the split string as a bullet point in HTML format.

Args:
str (str): The input string to be parsed.

Returns:
str: The formatted HTML string.

Example:
>>> parse_html_string("item1 | item2 | item3")
'\n&#x2022;item1\n\n&#x2022;item2\n\n&#x2022;item3\n'
"""
string = ""
for elem in str.split(" | "):
if elem:
Expand Down
Loading