diff --git a/boefjes/boefjes/plugins/kat_kat_finding_types/kat_finding_types.json b/boefjes/boefjes/plugins/kat_kat_finding_types/kat_finding_types.json index a8b69e6a0bf..556219e2829 100644 --- a/boefjes/boefjes/plugins/kat_kat_finding_types/kat_finding_types.json +++ b/boefjes/boefjes/plugins/kat_kat_finding_types/kat_finding_types.json @@ -419,6 +419,20 @@ "impact": "E-mail from this domain can potentially be spoofed if DMARC is not (properly) implemented in combination with DKIM and SPF.", "recommendation": "Fix the syntax of the SPF record." }, + "KAT-EXPENSIVE-SPF": { + "description": "This SPF record contains an expensive SPF construction.", + "source": "https://www.rfc-editor.org/rfc/rfc7208#section-4.6.4", + "risk": "low", + "impact": "Various recipient mailservers might not perform all requested lookups and bounce email because of missed allowed addresses, or bounce mail entirely due to too many dns lookups.", + "recommendation": "Consolidate the SPF record, remove unneeded lookups and mechanisms." + }, + "KAT-DEPRECATED-SPF-MECHANISM": { + "description": "This SPF record contains a deprecated SPF mechanism.", + "source": "https://www.rfc-editor.org/rfc/rfc7208#section-5.5", + "risk": "low", + "impact": "Deprecated mechanism is used. It should not be used.", + "recommendation": "Fix the SPF record, remove deprecated mechanisms." + }, "SUB-DOMAIN-TAKEOVER": { "description": "Subdomain takeover is when an attacker takes control of an unused or improperly configured subdomain, potentially accessing sensitive information or conducting phishing attacks.", "source": "https://developer.mozilla.org/en-US/docs/Web/Security/Subdomain_takeovers", diff --git a/octopoes/bits/spf_discovery/spf_discovery.py b/octopoes/bits/spf_discovery/spf_discovery.py index a094cc28cfb..15293935956 100644 --- a/octopoes/bits/spf_discovery/spf_discovery.py +++ b/octopoes/bits/spf_discovery/spf_discovery.py @@ -5,7 +5,12 @@ from octopoes.models import OOI from octopoes.models.ooi.dns.records import DNSTXTRecord from octopoes.models.ooi.dns.zone import Hostname -from octopoes.models.ooi.email_security import DNSSPFMechanismHostname, DNSSPFMechanismIP, DNSSPFRecord +from octopoes.models.ooi.email_security import ( + DNSSPFMechanismHostname, + DNSSPFMechanismIP, + DNSSPFRecord, + MechanismQualifier, +) from octopoes.models.ooi.findings import Finding, KATFindingType from octopoes.models.ooi.network import IPAddressV4, IPAddressV6, Network @@ -19,20 +24,23 @@ def run(input_ooi: DNSTXTRecord, additional_oois: list, config: dict[str, Any]) spf_record = DNSSPFRecord(dns_txt_record=input_ooi.reference, value=input_ooi.value, ttl=input_ooi.ttl) # walk through all mechanisms for mechanism in parsed[1]: + # strip of optional mechanism qualifiers + # http://www.open-spf.org/SPF_Record_Syntax/ + mechanism_qualifier = MechanismQualifier("+") + if mechanism.startswith(("+", "-", "~", "?")): + mechanism_qualifier = mechanism[0] + mechanism = mechanism[1:] + mechanism_qualifier = MechanismQualifier(mechanism_qualifier) + # ip4 and ip6 mechanisms if mechanism.startswith(("ip4:", "ip6:")): - yield from parse_ip_qualifiers(mechanism, input_ooi, spf_record) + yield from parse_ip_qualifiers(mechanism_qualifier, mechanism, input_ooi, spf_record) # a mechanisms and mx mechanisms have the same syntax - if mechanism.startswith("a") or mechanism.startswith("mx"): - yield from parse_a_mx_qualifiers(mechanism, input_ooi, spf_record) + if not mechanism.startswith("all") and mechanism.startswith("a") or mechanism.startswith("mx"): + yield from parse_a_mx_qualifiers(mechanism_qualifier, mechanism, input_ooi, spf_record) # exists ptr and include mechanisms have a similar syntax - if ( - mechanism.startswith("exists") - or mechanism.startswith("ptr") - or mechanism.startswith("include") - or mechanism.startswith("?include") - ): - yield from parse_ptr_exists_include_mechanism(mechanism, input_ooi, spf_record) + if mechanism.startswith("exists") or mechanism.startswith("ptr") or mechanism.startswith("include"): + yield from parse_ptr_exists_include_mechanism(mechanism_qualifier, mechanism, input_ooi, spf_record) # redirect mechanisms if mechanism.startswith("redirect"): yield from parse_redirect_mechanism(mechanism, input_ooi, spf_record) @@ -40,7 +48,7 @@ def run(input_ooi: DNSTXTRecord, additional_oois: list, config: dict[str, Any]) if mechanism.startswith("exp"): spf_record.exp = mechanism.split("=", 1)[1] if mechanism.endswith("all"): - spf_record.all = mechanism.strip("all") + spf_record.all = mechanism_qualifier.value yield spf_record else: ft = KATFindingType(id="KAT-INVALID-SPF") @@ -48,7 +56,9 @@ def run(input_ooi: DNSTXTRecord, additional_oois: list, config: dict[str, Any]) yield Finding(finding_type=ft.reference, ooi=input_ooi.reference, description="This SPF record is invalid") -def parse_ip_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord) -> Iterator[OOI]: +def parse_ip_qualifiers( + mechanism_qualifier: MechanismQualifier, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord +) -> Iterator[OOI]: # split mechanism into qualifier and ip qualifier, ip = mechanism.split(":", 1) ip = mechanism[4:] @@ -62,46 +72,61 @@ def parse_ip_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNS address=ip, network=Network(name=input_ooi.hostname.tokenized.network.name).reference ) yield ip_address - yield DNSSPFMechanismIP(spf_record=spf_record.reference, ip=ip_address.reference, mechanism="ip4") + yield DNSSPFMechanismIP( + spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanism_qualifier, mechanism="ip4" + ) if qualifier == "ip6": ip_address = IPAddressV6( address=ip, network=Network(name=input_ooi.hostname.tokenized.network.name).reference ) yield ip_address yield DNSSPFMechanismIP( - spf_record=spf_record.reference, ip=ip_address.reference, qualifier=qualifier, mechanism="ip6" + spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanism_qualifier, mechanism="ip6" ) -def parse_a_mx_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord) -> Iterator[OOI]: +def parse_a_mx_qualifiers( + mechanism_qualifier: MechanismQualifier, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord +) -> Iterator[OOI]: if mechanism == "a" or mechanism == "mx": - yield DNSSPFMechanismHostname(spf_record=spf_record.reference, hostname=input_ooi.hostname, mechanism=mechanism) - else: - mechanism_type, domain = mechanism.split(":", 1) - # remove prefix-length for now - # TODO: fix prefix lengths - domain = domain.split("/")[0] - hostname = Hostname(name=domain, network=Network(name=input_ooi.hostname.tokenized.network.name).reference) - yield hostname yield DNSSPFMechanismHostname( - spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type + spf_record=spf_record.reference, + hostname=input_ooi.hostname, + mechanism=mechanism, + qualifier=mechanism_qualifier, ) - if mechanism.startswith("a/") or mechanism.startswith("mx/"): - mechanism_type, domain = mechanism.split("/", 1) + else: + if mechanism.startswith("a/") or mechanism.startswith("mx/"): + mechanism_type, domain = mechanism.split("/", 1) + else: + mechanism_type, domain = mechanism.split(":", 1) + # remove prefix-length for now # TODO: fix prefix lengths domain = domain.split("/")[0] hostname = Hostname(name=domain, network=Network(name=input_ooi.hostname.tokenized.network.name).reference) yield hostname yield DNSSPFMechanismHostname( - spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type + spf_record=spf_record.reference, + hostname=hostname.reference, + mechanism=mechanism_type, + qualifier=mechanism_qualifier, ) def parse_ptr_exists_include_mechanism( - mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord + mechanism_qualifier: MechanismQualifier, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord ) -> Iterator[OOI]: if mechanism == "ptr": - yield DNSSPFMechanismHostname(spf_record=spf_record.reference, hostname=input_ooi.hostname, mechanism="ptr") + yield DNSSPFMechanismHostname( + spf_record=spf_record.reference, hostname=input_ooi.hostname, mechanism="ptr", qualifier=mechanism_qualifier + ) + ft = KATFindingType(id="KAT-DEPRECATED-SPF-MECHANISM") + yield ft + yield Finding( + finding_type=ft.reference, + ooi=input_ooi.reference, + description="This SPF record contains a PTR mechanism, Use of PTR is deprecated.", + ) else: mechanism_type, domain = mechanism.split(":", 1) # currently, the model only supports hostnames and not domains @@ -110,7 +135,10 @@ def parse_ptr_exists_include_mechanism( hostname = Hostname(name=domain, network=Network(name=input_ooi.hostname.tokenized.network.name).reference) yield hostname yield DNSSPFMechanismHostname( - spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type + spf_record=spf_record.reference, + hostname=hostname.reference, + mechanism=mechanism_type, + qualifier=mechanism_qualifier, ) diff --git a/octopoes/octopoes/models/__init__.py b/octopoes/octopoes/models/__init__.py index 362b4c0d254..fc6915cf9f3 100644 --- a/octopoes/octopoes/models/__init__.py +++ b/octopoes/octopoes/models/__init__.py @@ -200,7 +200,7 @@ def hydrate(node: dict[str, dict | str]) -> dict | str: if isinstance(value, dict): node[key] = hydrate(value) else: - node[key] = natural_key_parts.pop(0) + node[key] = natural_key_parts.pop(0) if natural_key_parts else value return node return PrimaryKeyToken.model_validate(hydrate(token_tree)) @@ -236,8 +236,7 @@ def _serialize_value(self, value: Any, required: bool) -> SerializedOOIValue: return value.value if isinstance(value, int | float): return value - else: - return str(value) + return str(value) def __hash__(self): return hash(self.primary_key) @@ -288,7 +287,7 @@ def build_token_tree(ooi_class: type[OOI]) -> dict[str, dict | str]: # combine trees tokens[attribute] = {key: value_ for tree in trees for key, value_ in tree.items()} else: - tokens[attribute] = "" + tokens[attribute] = field.default return tokens diff --git a/octopoes/octopoes/models/ooi/email_security.py b/octopoes/octopoes/models/ooi/email_security.py index e21615288fa..a31fb2a1be3 100644 --- a/octopoes/octopoes/models/ooi/email_security.py +++ b/octopoes/octopoes/models/ooi/email_security.py @@ -1,4 +1,5 @@ import hashlib +from enum import Enum from typing import Literal from octopoes.models import OOI, Reference @@ -30,6 +31,22 @@ def format_reference_human_readable(cls, reference: Reference) -> str: return f"SPF Record of {reference.tokenized.dns_txt_record.hostname.name}" +class MechanismQualifier(Enum): + ALLOW = "+" + FAIL = "-" + SOFTFAIL = "~" + NEUTRAL = "?" + + # the string representation maps to a human readable format of the qualifier + def __str__(self): + return { + MechanismQualifier.ALLOW: "Allow", + MechanismQualifier.FAIL: "Fail", + MechanismQualifier.SOFTFAIL: "Softfail", + MechanismQualifier.NEUTRAL: "Neutral", + }[self] + + class DNSSPFMechanism(OOI): spf_record: Reference = ReferenceField(DNSSPFRecord, max_inherit_scan_level=1) mechanism: str @@ -39,16 +56,17 @@ class DNSSPFMechanismIP(DNSSPFMechanism): object_type: Literal["DNSSPFMechanismIP"] = "DNSSPFMechanismIP" ip: Reference = ReferenceField(IPAddress) + qualifier: MechanismQualifier = MechanismQualifier.ALLOW - _natural_key_attrs = ["spf_record", "mechanism", "ip"] - _information_value = ["mechanism"] + _natural_key_attrs = ["spf_record", "mechanism", "ip", "qualifier"] + _information_value = ["mechanism", "qualifier"] _reverse_relation_names = {"spf_record": "spf_ip_mechanisms"} @classmethod def format_reference_human_readable(cls, reference: Reference) -> str: return ( - f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name}" - f"{reference.tokenized.mechanism} {reference.tokenized.ip.address}" + f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:{reference.tokenized.ip.address}" + f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}" ) @@ -56,16 +74,17 @@ class DNSSPFMechanismHostname(DNSSPFMechanism): object_type: Literal["DNSSPFMechanismHostname"] = "DNSSPFMechanismHostname" hostname: Reference = ReferenceField(Hostname) + qualifier: MechanismQualifier = MechanismQualifier.ALLOW - _natural_key_attrs = ["spf_record", "mechanism", "hostname"] - _information_value = ["mechanism"] + _natural_key_attrs = ["spf_record", "mechanism", "hostname", "qualifier"] + _information_value = ["mechanism", "qualifier"] _reverse_relation_names = {"spf_record": "spf_hostname_mechanisms"} @classmethod def format_reference_human_readable(cls, reference: Reference) -> str: return ( - f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name} " - f"{reference.tokenized.mechanism} {reference.tokenized.hostname.name}" + f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:{reference.tokenized.hostname.name}" + f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}" ) @@ -73,17 +92,18 @@ class DNSSPFMechanismNetBlock(DNSSPFMechanism): object_type: Literal["DNSSPFMechanismNetBlock"] = "DNSSPFMechanismNetBlock" netblock: Reference = ReferenceField(NetBlock) + qualifier: MechanismQualifier = MechanismQualifier.ALLOW - _natural_key_attrs = ["spf_record", "mechanism", "netblock"] - _information_value = ["mechanism"] + _natural_key_attrs = ["spf_record", "mechanism", "netblock", "qualifier"] + _information_value = ["mechanism", "qualifier"] _reverse_relation_names = {"spf_record": "spf_netblock_mechanisms"} @classmethod def format_reference_human_readable(cls, reference: Reference) -> str: return ( - f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name} " - f" {reference.tokenized.mechanism} {reference.tokenized.netblock.start_ip}" - f"/{reference.tokenized.netblock.mask}" + f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:" + f"{reference.tokenized.netblock.start_ip}/{reference.tokenized.netblock.mask}" + f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}" )