Skip to content

Commit

Permalink
platform(terraform): Add eval keys (#6929)
Browse files Browse the repository at this point in the history
* Terraform resource AWS checks only

* flake8 fixes

* fix for Issue 6932

* Add AliCloud

* Terraform Azure

* flake8 fixes

* GCP

* TF Kubernetes

* GitHub

* Other terraform providers

* Fix flake8
  • Loading branch information
tsmithv11 authored Jan 21, 2025
1 parent fc57c4d commit d01c395
Show file tree
Hide file tree
Showing 70 changed files with 237 additions and 38 deletions.
2 changes: 2 additions & 0 deletions checkov/terraform/checks/data/aws/IAMManagedAdminPolicy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ def __init__(self):
def scan_data_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if "name" in conf.keys():
if conf.get("name")[0] == ADMIN_POLICY_NAME:
self.evaluated_keys = ["name"]
return CheckResult.FAILED

if "arn" in conf.keys():
if conf.get("arn")[0] == ADMIN_POLICY_ARN:
self.evaluated_keys = ["arn"]
return CheckResult.FAILED

return CheckResult.PASSED
Expand Down
3 changes: 3 additions & 0 deletions checkov/terraform/checks/resource/alicloud/AbsRDSParameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ def scan_resource_conf(self, conf):
if param['name'][0] == self.parameter and (param['value'][0]).lower() == 'on':
return CheckResult.PASSED
return CheckResult.FAILED

def get_evaluated_keys(self):
return ["parameters"]
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Any, List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
Expand All @@ -23,5 +23,8 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
return CheckResult.PASSED
return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ["encrypted"]


check = DiskEncryptedWithCMK()
9 changes: 7 additions & 2 deletions checkov/terraform/checks/resource/alicloud/DiskIsEncrypted.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
from typing import List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck


class DiskIsEncrypted(BaseResourceCheck):
def __init__(self):
def __init__(self) -> None:
name = "Ensure disk is encrypted"
id = "CKV_ALI_7"
supported_resources = ['alicloud_disk']
categories = [CheckCategories.ENCRYPTION]
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
def scan_resource_conf(self, conf) -> CheckResult:
if conf.get("snapshot_id"):
return CheckResult.UNKNOWN
if conf.get("encrypted") and conf.get("encrypted") == [True]:
return CheckResult.PASSED
return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ['encrypted']


check = DiskIsEncrypted()
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ def __init__(self):
def scan_resource_conf(self, conf):
data_disks = conf.get("data_disks")
if data_disks and isinstance(data_disks, list):
for disk in data_disks:
for idx, disk in enumerate(data_disks):
if disk.get('encrypted') != [True]:
self.evaluated_keys = [f'data_disks/[{idx}]/encrypted']
return CheckResult.FAILED
return CheckResult.PASSED

Expand Down
4 changes: 3 additions & 1 deletion checkov/terraform/checks/resource/aws/AMIEncryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ def __init__(self):
def scan_resource_conf(self, conf) -> CheckResult:
if conf.get('ebs_block_device'):
mappings = conf.get('ebs_block_device')
for mapping in mappings:
self.evaluated_keys = ["ebs_block_device"]
for mapping_idx, mapping in enumerate(mappings):
if not mapping.get("snapshot_id"):
if not mapping.get("encrypted"):
return CheckResult.FAILED
if mapping.get("encrypted")[0] is False:
self.evaluated_keys.append(f"ebs_block_device/[{mapping_idx}]/encrypted")
return CheckResult.FAILED
# pass thru
return CheckResult.PASSED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self) -> None:
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def _is_policy_secure(self, policy: Dict[str, Any]) -> bool:
def _is_policy_secure(self, policy: Dict[str, Any]) -> CheckResult:
# Check that the policy doesn't allow for all principals to us action execute-api:Invoke
passed = True
if policy.get("Statement"):
Expand Down
3 changes: 3 additions & 0 deletions checkov/terraform/checks/resource/aws/AWSCodeGuruHasCMK.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ def scan_resource_conf(self, conf: Dict[str, List[Any]]) -> CheckResult:

return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ['kms_key_details']


check = AWSCodeGuruHasCMK()
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def __init__(self) -> None:
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
self.evaluated_keys = ["container_properties"]
container_properties = conf.get("container_properties")
if container_properties:
if isinstance(container_properties[0], str):
Expand All @@ -28,6 +29,7 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if not isinstance(container, dict):
return CheckResult.UNKNOWN
if container.get("privileged"):
self.evaluated_keys.append("container_properties/[0]/privileged")
return CheckResult.FAILED
return CheckResult.PASSED
return CheckResult.UNKNOWN
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Any, List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import \
Expand Down Expand Up @@ -33,5 +33,8 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ["retention_in_days"]


check = CloudWatchLogGroupRetentionYear()
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ def __init__(self) -> None:
def scan_resource_conf(self, conf):
groups = conf.get("origin_group")
if groups and isinstance(groups, list):
for group in groups:
self.evaluated_keys = ["origin_group"]
for group_idx, group in enumerate(groups):
if isinstance(group, dict) and group.get("failover_criteria"):
member = group.get("member")
if not member or len(member) < 2:
self.evaluated_keys.append(f"origin_group/[{group_idx}]/member")
return CheckResult.FAILED
else:
return CheckResult.FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if execute_command and isinstance(execute_command, list):
execute_command = execute_command[0]
if isinstance(execute_command, dict) and not execute_command.get("logging") == ["NONE"]:
self.evaluated_keys = ["configuration/[0]/execute_command_configuration"]
if execute_command.get("kms_key_id"):
log_conf = execute_command.get("log_configuration")
if log_conf and isinstance(log_conf, list):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Any, List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
Expand Down Expand Up @@ -33,5 +33,8 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.UNKNOWN

def get_evaluated_keys(self) -> List[str]:
return ["launch_type", "platform_version"]


check = ECSServiceFargateLatest()
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ def __init__(self) -> None:

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if 'listener' in conf:
for listener in conf.get('listener'):
self.evaluated_keys = ['listener']
for listener_idx, listener in enumerate(conf.get('listener')):
if 'instance_protocol' in listener:
self.evaluated_keys.append(f'listener/[{listener_idx}]/instance_protocol')
if listener.get('instance_protocol')[0].lower() in ('http', 'tcp'):
return CheckResult.FAILED
if listener.get('instance_protocol')[0].lower() in ('https', 'ssl') and \
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Any, List

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.common.util.data_structures_utils import find_in_dict
Expand Down Expand Up @@ -31,5 +31,11 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.UNKNOWN

def get_evaluated_keys(self) -> List[str]:
return [
"configuration",
"configuration/[0]/EncryptionConfiguration/AtRestEncryptionConfiguration/LocalDiskEncryptionConfiguration/EnableEbsEncryption"
]


check = EMRClusterConfEncryptsEBS()
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.UNKNOWN

def get_evaluated_keys(self) -> list[str]:
return ["configuration"]


check = EMRClusterConfEncryptsInTransit()
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Any, List

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.common.util.data_structures_utils import find_in_dict
Expand Down Expand Up @@ -31,5 +31,11 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.UNKNOWN

def get_evaluated_keys(self) -> List[str]:
return [
"configuration",
"configuration/[0]/EncryptionConfiguration/AtRestEncryptionConfiguration/LocalDiskEncryptionConfiguration/EnableAtRestEncryption"
]


check = EMRClusterConfEncryptsLocalDisk()
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck

Expand Down Expand Up @@ -30,5 +32,8 @@ def scan_resource_conf(self, conf):
return CheckResult.PASSED
return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ["setting"]


check = ElasticBeanstalkUseEnhancedHealthChecks()
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,8 @@ def scan_resource_conf(self, conf):
return CheckResult.PASSED
return CheckResult.FAILED

def get_evaluated_keys(self):
return ["setting"]


check = ElasticBeanstalkUseManagedUpdates()
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,8 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.FAILED

def get_evaluated_keys(self) -> list[str]:
return ["log_publishing_options"]


check = ElasticsearchDomainAuditLogging()
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if self.entity_type == "aws_iam_role":
if "managed_policy_arns" in conf.keys():
if ADMIN_POLICY_ARN in conf["managed_policy_arns"][0]:
self.evaluated_keys = ["managed_policy_arns"]
return CheckResult.FAILED

elif self.entity_type in (
Expand All @@ -46,13 +47,15 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
):
policy_arn = conf.get("policy_arn")
if policy_arn and isinstance(policy_arn, list) and policy_arn[0] == ADMIN_POLICY_ARN:
self.evaluated_keys = ["policy_arn"]
return CheckResult.FAILED

elif self.entity_type in (
"aws_ssoadmin_managed_policy_attachment"
):
managed_policy_arn = conf.get("managed_policy_arn")
if managed_policy_arn and isinstance(managed_policy_arn, list) and managed_policy_arn[0] == ADMIN_POLICY_ARN:
self.evaluated_keys = ["managed_policy_arn"]
return CheckResult.FAILED

return CheckResult.PASSED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ def __init__(self) -> None:
def scan_resource_conf(self, conf: Dict[str, List[Any]]) -> CheckResult:
mappings = conf.get("block_device_mapping")
if mappings and isinstance(mappings, list):
self.evaluated_keys = ["block_device_mapping"]
for mapping in mappings:
if mapping.get("ebs"):
self.evaluated_keys.append("block_device_mapping/[0]/ebs")
ebs = mapping["ebs"][0]
if not ebs.get("encrypted"):
return CheckResult.FAILED
Expand Down
5 changes: 5 additions & 0 deletions checkov/terraform/checks/resource/aws/MQBrokerVersion.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import re
from typing import List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck

Expand Down Expand Up @@ -37,5 +39,8 @@ def scan_resource_conf(self, conf) -> CheckResult:

return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ["engine_type", "engine_version"]


check = MQBrokerVersion()
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def scan_resource_conf(self, conf):
return CheckResult.FAILED

def get_evaluated_keys(self):
return 'backup_retention_period'
return ['backup_retention_period']


check = NeptuneClusterBackupRetention()
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Any, List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
Expand Down Expand Up @@ -44,5 +44,8 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ["enabled_cloudwatch_logs_exports"]


check = RDSClusterAuditLogging()
5 changes: 3 additions & 2 deletions checkov/terraform/checks/resource/aws/S3GlobalViewACL.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ def __init__(self) -> None:

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if 'access_control_policy' in conf:
for policy in conf.get('access_control_policy'):
for policy_idx, policy in enumerate(conf["access_control_policy"]):
if 'grant' in policy:
for grant in policy.get('grant'):
for grant_idx, grant in enumerate(policy["grant"]):
self.evaluated_keys = [f"access_control_policy/[{policy_idx}]/grant/[{grant_idx}]/permission"]
if (isinstance(grant, dict) and 'permission' in grant and
('FULL_CONTROL' in grant.get('permission') or 'READ_ACP' in grant.get('permission'))):
if 'grantee' in grant:
Expand Down
5 changes: 4 additions & 1 deletion checkov/terraform/checks/resource/aws/SQSQueueEncryption.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Any, List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
Expand Down Expand Up @@ -29,5 +29,8 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ["sqs_managed_sse_enabled", "kms_master_key_id"]


check = SQSQueueEncryption()
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ def __init__(self) -> None:
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
self.evaluated_keys = ["rotation_rules"]
rules = conf.get("rotation_rules")
if rules and isinstance(rules, list):
days = rules[0].get('automatically_after_days')
if days and isinstance(days, list):
self.evaluated_keys = ["rotation_rules/[0]/automatically_after_days"]
days = force_int(days[0])
if days is not None and days < 90:
return CheckResult.PASSED
Expand Down
Loading

0 comments on commit d01c395

Please sign in to comment.