Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(tags): convert tags to a dictionary #4598

Merged
45 changes: 5 additions & 40 deletions prowler/lib/outputs/asff/asff.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class ASFF(Output):
- transform(findings: list[Finding]) -> None: Transforms a list of findings into ASFF format.
- batch_write_data_to_file() -> None: Writes the findings data to a file in JSON ASFF format.
- generate_status(status: str, muted: bool = False) -> str: Generates the ASFF status based on the provided status and muted flag.
- format_resource_tags(tags: str) -> dict: Transforms a string of tags into a dictionary format.

References:
- AWS Security Hub API Reference: https://docs.aws.amazon.com/securityhub/1.0/APIReference/API_Compliance.html
Expand Down Expand Up @@ -62,15 +61,13 @@ def transform(self, findings: list[Finding]) -> None:
if finding.status == "MANUAL":
continue
timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
resource_tags = ASFF.format_resource_tags(finding.resource_tags)

associated_standards, compliance_summary = ASFF.format_compliance(
finding.compliance
)

# Ensures finding_status matches allowed values in ASFF
finding_status = ASFF.generate_status(finding.status, finding.muted)

self._data.append(
AWSSecurityFindingFormat(
# The following line cannot be changed because it is the format we use to generate unique findings for AWS Security Hub
Expand Down Expand Up @@ -99,7 +96,11 @@ def transform(self, findings: list[Finding]) -> None:
Type=finding.resource_type,
Partition=finding.partition,
Region=finding.region,
Tags=resource_tags,
Tags=(
finding.resource_tags
if finding.resource_tags
else None
),
sergargar marked this conversation as resolved.
Show resolved Hide resolved
)
],
Compliance=Compliance(
Expand Down Expand Up @@ -195,42 +196,6 @@ def generate_status(status: str, muted: bool = False) -> str:

return json_asff_status

@staticmethod
def format_resource_tags(tags: str) -> dict:
"""
Transforms a string of tags into a dictionary format.

Parameters:
- tags (str): A string containing tags separated by ' | ' and key-value pairs separated by '='.

Returns:
- dict: A dictionary where keys are tag names and values are tag values.

Notes:
- If the input string is empty or None, it returns None.
- Each tag in the input string should be in the format 'key=value'.
- If the input string is not formatted correctly, it logs an error and returns None.
"""
try:
tags_dict = None
if tags:
tags = tags.split(" | ")
tags_dict = {}
for tag in tags:
value = tag.split("=")
tags_dict[value[0]] = value[1]
return tags_dict
except IndexError as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return None
except AttributeError as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return None

@staticmethod
def format_compliance(compliance: dict) -> tuple[list[dict], list[str]]:
"""
Expand Down
11 changes: 8 additions & 3 deletions prowler/lib/outputs/csv/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from prowler.lib.logger import logger
from prowler.lib.outputs.finding import Finding
from prowler.lib.outputs.output import Output
from prowler.lib.outputs.utils import unroll_dict, unroll_list
from prowler.lib.outputs.utils import unroll_dict


class CSV(Output):
Expand All @@ -17,8 +17,13 @@ def transform(self, findings: list[Finding]) -> None:
try:
for finding in findings:
finding_dict = {k.upper(): v for k, v in finding.dict().items()}
finding_dict["COMPLIANCE"] = unroll_dict(finding.compliance)
finding_dict["ACCOUNT_TAGS"] = unroll_list(finding.account_tags)
finding_dict["RESOURCE_TAGS"] = unroll_dict(finding.resource_tags)
finding_dict["COMPLIANCE"] = unroll_dict(
finding.compliance, separator=": "
)
finding_dict["ACCOUNT_TAGS"] = unroll_dict(
finding.account_tags, separator=":"
)
finding_dict["STATUS"] = finding.status.value
finding_dict["SEVERITY"] = finding.severity.value
self._data.append(finding_dict)
Expand Down
4 changes: 2 additions & 2 deletions prowler/lib/outputs/finding.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Finding(BaseModel):
# Optional since it depends on permissions
account_organization_name: Optional[str]
# Optional since it depends on permissions
account_tags: Optional[list[str]]
account_tags: dict = {}
finding_uid: str
provider: str
check_id: str
Expand All @@ -66,7 +66,7 @@ class Finding(BaseModel):
resource_uid: str
resource_name: str
resource_details: str
resource_tags: str
resource_tags: dict = {}
# Only present for AWS and Azure
partition: Optional[str]
region: str
Expand Down
4 changes: 2 additions & 2 deletions prowler/lib/outputs/html/html.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ def transform(self, findings: list[Finding]) -> None:
<td>{finding.check_id.replace("_", "<wbr />_")}</td>
<td>{finding.check_title}</td>
<td>{finding.resource_uid.replace("<", "&lt;").replace(">", "&gt;").replace("_", "<wbr />_")}</td>
<td>{parse_html_string(finding.resource_tags)}</td>
<td>{parse_html_string(unroll_dict(finding.resource_tags))}</td>
<td>{finding.status_extended.replace("<", "&lt;").replace(">", "&gt;").replace("_", "<wbr />_")}</td>
<td><p class="show-read-more">{html.escape(finding.risk)}</p></td>
<td><p class="show-read-more">{html.escape(finding.remediation_recommendation_text)}</p> <a class="read-more" href="{finding.remediation_recommendation_url}"><i class="fas fa-external-link-alt"></i></a></td>
<td><p class="show-read-more">{parse_html_string(unroll_dict(finding.compliance))}</p></td>
<td><p class="show-read-more">{parse_html_string(unroll_dict(finding.compliance, separator=": "))}</p></td>
</tr>
"""
)
Expand Down
10 changes: 3 additions & 7 deletions prowler/lib/outputs/ocsf/ocsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from prowler.lib.logger import logger
from prowler.lib.outputs.finding import Finding
from prowler.lib.outputs.output import Output
from prowler.lib.outputs.utils import unroll_dict_to_list


class OCSF(Output):
Expand Down Expand Up @@ -97,12 +98,7 @@ def transform(self, findings: List[Finding]) -> None:
risk_details=finding.risk,
resources=[
ResourceDetails(
# TODO: Check labels for other providers
labels=(
finding.resource_tags.split(",")
if finding.resource_tags
else []
),
labels=unroll_dict_to_list(finding.resource_tags),
name=finding.resource_name,
uid=finding.resource_uid,
group=Group(name=finding.service_name),
Expand Down Expand Up @@ -148,7 +144,7 @@ def transform(self, findings: List[Finding]) -> None:
type_id=cloud_account_type.value,
type=cloud_account_type.name,
uid=finding.account_uid,
labels=finding.account_tags,
labels=unroll_dict_to_list(finding.account_tags),
),
org=Organization(
uid=finding.account_organization_uid,
Expand Down
69 changes: 30 additions & 39 deletions prowler/lib/outputs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,64 +14,55 @@


def unroll_tags(tags: list):
unrolled_items = ""
separator = "|"
"""
Unrolls tags from a list of dictionaries into a single dictionary.
Example:
tags = [{'Key': 'Name', 'Value': 'test'}, {'Key': 'Environment', 'Value': 'test'}]
unroll_tags(tags) -> {'Name': 'test', 'Environment': 'test'}
"""
if tags and tags != [{}] and tags != [None]:
for item in tags:
# Check if there are tags in list
if isinstance(item, dict):
for key, value in item.items():
if not unrolled_items:
# Check the pattern of tags (Key:Value or Key:key/Value:value)
if "Key" != key and "Value" != key:
unrolled_items = f"{key}={value}"
else:
if "Key" == key:
unrolled_items = f"{value}="
else:
unrolled_items = f"{value}"
else:
if "Key" != key and "Value" != key:
unrolled_items = (
f"{unrolled_items} {separator} {key}={value}"
)
else:
if "Key" == key:
unrolled_items = (
f"{unrolled_items} {separator} {value}="
)
else:
unrolled_items = f"{unrolled_items}{value}"
elif not unrolled_items:
unrolled_items = f"{item}"
else:
unrolled_items = f"{unrolled_items} {separator} {item}"

return unrolled_items
if "key" in tags[0]:
return {item["key"]: item["value"] for item in tags}

Check warning on line 25 in prowler/lib/outputs/utils.py

View check run for this annotation

Codecov / codecov/patch

prowler/lib/outputs/utils.py#L25

Added line #L25 was not covered by tests
elif "Key" in tags[0]:
return {item["Key"]: item["Value"] for item in tags}
else:
return {key: value for d in tags for key, value in d.items()}
return {}


def unroll_dict(dict: dict):
def unroll_dict(dict: dict, separator: str = "="):
"""
Unrolls a dictionary into a string.
Example:
dict = {'Name': 'test', 'Environment': 'test'}
unroll_dict(dict) -> 'Name=test | Environment=test'
"""
unrolled_items = ""
separator = "|"
for key, value in dict.items():
if isinstance(value, list):
value = ", ".join(value)
if not unrolled_items:
unrolled_items = f"{key}: {value}"
unrolled_items = f"{key}{separator}{value}"
else:
unrolled_items = f"{unrolled_items} {separator} {key}: {value}"
unrolled_items = f"{unrolled_items} | {key}{separator}{value}"

return unrolled_items


def unroll_dict_to_list(dict: dict):
"""
Unrolls a dictionary into a list.
Example:
dict = {'Name': 'test', 'Environment': 'dev'}
unroll_dict_to_list(dict) -> ['Name=test', 'Environment=dev']
"""
dict_list = []
for key, value in dict.items():
if isinstance(value, list):
value = ", ".join(value)
dict_list.append(f"{key}: {value}")
dict_list.append(f"{key}={value}")
else:
dict_list.append(f"{key}: {value}")
dict_list.append(f"{key}={value}")

return dict_list

Expand Down
4 changes: 2 additions & 2 deletions prowler/providers/aws/lib/mutelist/mutelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from prowler.lib.logger import logger
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_tags
from prowler.lib.outputs.utils import unroll_dict, unroll_tags


class AWSMutelist(Mutelist):
Expand Down Expand Up @@ -53,7 +53,7 @@ def is_finding_muted(
finding.check_metadata.CheckID,
finding.region,
finding.resource_id,
unroll_tags(finding.resource_tags),
unroll_dict(unroll_tags(finding.resource_tags)),
)

def get_mutelist_file_from_s3(self, aws_session: Session = None):
Expand Down
4 changes: 2 additions & 2 deletions prowler/providers/aws/lib/organizations/organizations.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ def get_organizations_metadata(
def parse_organizations_metadata(metadata: dict, tags: dict) -> AWSOrganizationsInfo:
try:
# Convert Tags dictionary to String
account_details_tags = []
account_details_tags = {}
for tag in tags.get("Tags", {}):
account_details_tags.append(f"{tag['Key']}:{tag['Value']}")
account_details_tags[tag["Key"]] = tag["Value"]

account_details = metadata.get("Account", {})

Expand Down
4 changes: 2 additions & 2 deletions prowler/providers/azure/lib/mutelist/mutelist.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any

from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_tags
from prowler.lib.outputs.utils import unroll_dict, unroll_tags


class AzureMutelist(Mutelist):
Expand All @@ -14,5 +14,5 @@ def is_finding_muted(
finding.check_metadata.CheckID,
finding.location,
finding.resource_name,
unroll_tags(finding.resource_tags),
unroll_dict(unroll_tags(finding.resource_tags)),
)
4 changes: 2 additions & 2 deletions prowler/providers/gcp/gcp_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,9 @@
response = request.execute()

for project in response.get("projects", []):
labels = []
labels = {}

Check warning on line 282 in prowler/providers/gcp/gcp_provider.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/gcp/gcp_provider.py#L282

Added line #L282 was not covered by tests
for key, value in project.get("labels", {}).items():
labels.append(f"{key}:{value}")
labels[key] = value

Check warning on line 284 in prowler/providers/gcp/gcp_provider.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/gcp/gcp_provider.py#L284

Added line #L284 was not covered by tests

project_id = project["projectId"]
gcp_project = GCPProject(
Expand Down
4 changes: 2 additions & 2 deletions prowler/providers/gcp/lib/mutelist/mutelist.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any

from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_tags
from prowler.lib.outputs.utils import unroll_dict, unroll_tags


class GCPMutelist(Mutelist):
Expand All @@ -14,5 +14,5 @@ def is_finding_muted(
finding.check_metadata.CheckID,
finding.location,
finding.resource_name,
unroll_tags(finding.resource_tags),
unroll_dict(unroll_tags(finding.resource_tags)),
)
2 changes: 1 addition & 1 deletion prowler/providers/gcp/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class GCPProject(BaseModel):
id: str
name: str
organization: Optional[GCPOrganization]
labels: list[str]
labels: dict
lifecycle_state: str


Expand Down
4 changes: 2 additions & 2 deletions prowler/providers/kubernetes/lib/mutelist/mutelist.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any

from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_tags
from prowler.lib.outputs.utils import unroll_dict, unroll_tags


class KubernetesMutelist(Mutelist):
Expand All @@ -15,5 +15,5 @@ def is_finding_muted(
finding.check_metadata.CheckID,
finding.namespace,
finding.resource_name,
unroll_tags(finding.resource_tags),
unroll_dict(unroll_tags(finding.resource_tags)),
)
Loading