Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feature/nibbles
Browse files Browse the repository at this point in the history
  • Loading branch information
originalsouth committed Jan 13, 2025
2 parents 1a2b1e7 + 7514b64 commit b9470e4
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,20 @@
"impact": "E-mail from this domain can potentially be spoofed if DMARC is not (properly) implemented in combination with DKIM and SPF.",
"recommendation": "Fix the syntax of the SPF record."
},
"KAT-EXPENSIVE-SPF": {
"description": "This SPF record contains an expensive SPF construction.",
"source": "https://www.rfc-editor.org/rfc/rfc7208#section-4.6.4",
"risk": "low",
"impact": "Various recipient mailservers might not perform all requested lookups and bounce email because of missed allowed addresses, or bounce mail entirely due to too many dns lookups.",
"recommendation": "Consolidate the SPF record, remove unneeded lookups and mechanisms."
},
"KAT-DEPRECATED-SPF-MECHANISM": {
"description": "This SPF record contains a deprecated SPF mechanism.",
"source": "https://www.rfc-editor.org/rfc/rfc7208#section-5.5",
"risk": "low",
"impact": "Deprecated mechanism is used. It should not be used.",
"recommendation": "Fix the SPF record, remove deprecated mechanisms."
},
"SUB-DOMAIN-TAKEOVER": {
"description": "Subdomain takeover is when an attacker takes control of an unused or improperly configured subdomain, potentially accessing sensitive information or conducting phishing attacks.",
"source": "https://developer.mozilla.org/en-US/docs/Web/Security/Subdomain_takeovers",
Expand Down
90 changes: 59 additions & 31 deletions octopoes/nibbles/spf_discovery/spf_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
from octopoes.models import OOI
from octopoes.models.ooi.dns.records import DNSTXTRecord
from octopoes.models.ooi.dns.zone import Hostname
from octopoes.models.ooi.email_security import DNSSPFMechanismHostname, DNSSPFMechanismIP, DNSSPFRecord
from octopoes.models.ooi.email_security import (
DNSSPFMechanismHostname,
DNSSPFMechanismIP,
DNSSPFRecord,
MechanismQualifier,
)
from octopoes.models.ooi.findings import Finding, KATFindingType
from octopoes.models.ooi.network import IPAddressV4, IPAddressV6, Network

Expand All @@ -18,36 +23,41 @@ def nibble(input_ooi: DNSTXTRecord) -> Iterator[OOI]:
spf_record = DNSSPFRecord(dns_txt_record=input_ooi.reference, value=input_ooi.value, ttl=input_ooi.ttl)
# walk through all mechanisms
for mechanism in parsed[1]:
# strip of optional mechanism qualifiers
# http://www.open-spf.org/SPF_Record_Syntax/
mechanism_qualifier = MechanismQualifier("+")
if mechanism.startswith(("+", "-", "~", "?")):
mechanism_qualifier = mechanism[0]
mechanism = mechanism[1:]
mechanism_qualifier = MechanismQualifier(mechanism_qualifier)

# ip4 and ip6 mechanisms
if mechanism.startswith(("ip4:", "ip6:")):
yield from parse_ip_qualifiers(mechanism, input_ooi, spf_record)
yield from parse_ip_qualifiers(mechanism_qualifier, mechanism, input_ooi, spf_record)
# a mechanisms and mx mechanisms have the same syntax
if mechanism.startswith("a") or mechanism.startswith("mx"):
yield from parse_a_mx_qualifiers(mechanism, input_ooi, spf_record)
if not mechanism.startswith("all") and mechanism.startswith("a") or mechanism.startswith("mx"):
yield from parse_a_mx_qualifiers(mechanism_qualifier, mechanism, input_ooi, spf_record)
# exists ptr and include mechanisms have a similar syntax
if (
mechanism.startswith("exists")
or mechanism.startswith("ptr")
or mechanism.startswith("include")
or mechanism.startswith("?include")
):
yield from parse_ptr_exists_include_mechanism(mechanism, input_ooi, spf_record)
if mechanism.startswith("exists") or mechanism.startswith("ptr") or mechanism.startswith("include"):
yield from parse_ptr_exists_include_mechanism(mechanism_qualifier, mechanism, input_ooi, spf_record)
# redirect mechanisms
if mechanism.startswith("redirect"):
yield from parse_redirect_mechanism(mechanism, input_ooi, spf_record)
# exp mechanism is handled separately because does not necessarily have a hostname
if mechanism.startswith("exp"):
spf_record.exp = mechanism.split("=", 1)[1]
if mechanism.endswith("all"):
spf_record.all = mechanism.strip("all")
spf_record.all = mechanism_qualifier.value
yield spf_record
else:
ft = KATFindingType(id="KAT-INVALID-SPF")
yield ft
yield Finding(finding_type=ft.reference, ooi=input_ooi.reference, description="This SPF record is invalid")


def parse_ip_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord) -> Iterator[OOI]:
def parse_ip_qualifiers(
mechanism_qualifier: MechanismQualifier, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord
) -> Iterator[OOI]:
# split mechanism into qualifier and ip
qualifier, ip = mechanism.split(":", 1)
ip = mechanism[4:]
Expand All @@ -61,46 +71,61 @@ def parse_ip_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNS
address=ip, network=Network(name=input_ooi.hostname.tokenized.network.name).reference
)
yield ip_address
yield DNSSPFMechanismIP(spf_record=spf_record.reference, ip=ip_address.reference, mechanism="ip4")
yield DNSSPFMechanismIP(
spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanism_qualifier, mechanism="ip4"
)
if qualifier == "ip6":
ip_address = IPAddressV6(
address=ip, network=Network(name=input_ooi.hostname.tokenized.network.name).reference
)
yield ip_address
yield DNSSPFMechanismIP(
spf_record=spf_record.reference, ip=ip_address.reference, qualifier=qualifier, mechanism="ip6"
spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanism_qualifier, mechanism="ip6"
)


def parse_a_mx_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord) -> Iterator[OOI]:
def parse_a_mx_qualifiers(
mechanism_qualifier: MechanismQualifier, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord
) -> Iterator[OOI]:
if mechanism == "a" or mechanism == "mx":
yield DNSSPFMechanismHostname(spf_record=spf_record.reference, hostname=input_ooi.hostname, mechanism=mechanism)
else:
mechanism_type, domain = mechanism.split(":", 1)
# remove prefix-length for now
# TODO: fix prefix lengths
domain = domain.split("/")[0]
hostname = Hostname(name=domain, network=Network(name=input_ooi.hostname.tokenized.network.name).reference)
yield hostname
yield DNSSPFMechanismHostname(
spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type
spf_record=spf_record.reference,
hostname=input_ooi.hostname,
mechanism=mechanism,
qualifier=mechanism_qualifier,
)
if mechanism.startswith("a/") or mechanism.startswith("mx/"):
mechanism_type, domain = mechanism.split("/", 1)
else:
if mechanism.startswith("a/") or mechanism.startswith("mx/"):
mechanism_type, domain = mechanism.split("/", 1)
else:
mechanism_type, domain = mechanism.split(":", 1)
# remove prefix-length for now
# TODO: fix prefix lengths
domain = domain.split("/")[0]
hostname = Hostname(name=domain, network=Network(name=input_ooi.hostname.tokenized.network.name).reference)
yield hostname
yield DNSSPFMechanismHostname(
spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type
spf_record=spf_record.reference,
hostname=hostname.reference,
mechanism=mechanism_type,
qualifier=mechanism_qualifier,
)


def parse_ptr_exists_include_mechanism(
mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord
mechanism_qualifier: MechanismQualifier, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord
) -> Iterator[OOI]:
if mechanism == "ptr":
yield DNSSPFMechanismHostname(spf_record=spf_record.reference, hostname=input_ooi.hostname, mechanism="ptr")
yield DNSSPFMechanismHostname(
spf_record=spf_record.reference, hostname=input_ooi.hostname, mechanism="ptr", qualifier=mechanism_qualifier
)
ft = KATFindingType(id="KAT-DEPRECATED-SPF-MECHANISM")
yield ft
yield Finding(
finding_type=ft.reference,
ooi=input_ooi.reference,
description="This SPF record contains a PTR mechanism, Use of PTR is deprecated.",
)
else:
mechanism_type, domain = mechanism.split(":", 1)
# currently, the model only supports hostnames and not domains
Expand All @@ -109,7 +134,10 @@ def parse_ptr_exists_include_mechanism(
hostname = Hostname(name=domain, network=Network(name=input_ooi.hostname.tokenized.network.name).reference)
yield hostname
yield DNSSPFMechanismHostname(
spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type
spf_record=spf_record.reference,
hostname=hostname.reference,
mechanism=mechanism_type,
qualifier=mechanism_qualifier,
)


Expand Down
7 changes: 3 additions & 4 deletions octopoes/octopoes/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def hydrate(node: dict[str, dict | str]) -> dict | str:
if isinstance(value, dict):
node[key] = hydrate(value)
else:
node[key] = natural_key_parts.pop(0)
node[key] = natural_key_parts.pop(0) if natural_key_parts else value
return node

return PrimaryKeyToken.model_validate(hydrate(token_tree))
Expand Down Expand Up @@ -237,8 +237,7 @@ def _serialize_value(self, value: Any, required: bool) -> SerializedOOIValue:
return value.value
if isinstance(value, int | float):
return value
else:
return str(value)
return str(value)

def __hash__(self):
def freeze(items: Iterable[Any | Iterable[Any | None] | None]) -> Iterable[int]:
Expand Down Expand Up @@ -299,7 +298,7 @@ def build_token_tree(ooi_class: type[OOI]) -> dict[str, dict | str]:
# combine trees
tokens[attribute] = {key: value_ for tree in trees for key, value_ in tree.items()}
else:
tokens[attribute] = ""
tokens[attribute] = field.default
return tokens


Expand Down
46 changes: 33 additions & 13 deletions octopoes/octopoes/models/ooi/email_security.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import hashlib
from enum import Enum
from typing import Literal

from octopoes.models import OOI, Reference
Expand Down Expand Up @@ -30,6 +31,22 @@ def format_reference_human_readable(cls, reference: Reference) -> str:
return f"SPF Record of {reference.tokenized.dns_txt_record.hostname.name}"


class MechanismQualifier(Enum):
ALLOW = "+"
FAIL = "-"
SOFTFAIL = "~"
NEUTRAL = "?"

# the string representation maps to a human readable format of the qualifier
def __str__(self):
return {
MechanismQualifier.ALLOW: "Allow",
MechanismQualifier.FAIL: "Fail",
MechanismQualifier.SOFTFAIL: "Softfail",
MechanismQualifier.NEUTRAL: "Neutral",
}[self]


class DNSSPFMechanism(OOI):
spf_record: Reference = ReferenceField(DNSSPFRecord, max_inherit_scan_level=1)
mechanism: str
Expand All @@ -39,51 +56,54 @@ class DNSSPFMechanismIP(DNSSPFMechanism):
object_type: Literal["DNSSPFMechanismIP"] = "DNSSPFMechanismIP"

ip: Reference = ReferenceField(IPAddress)
qualifier: MechanismQualifier = MechanismQualifier.ALLOW

_natural_key_attrs = ["spf_record", "mechanism", "ip"]
_information_value = ["mechanism"]
_natural_key_attrs = ["spf_record", "mechanism", "ip", "qualifier"]
_information_value = ["mechanism", "qualifier"]
_reverse_relation_names = {"spf_record": "spf_ip_mechanisms"}

@classmethod
def format_reference_human_readable(cls, reference: Reference) -> str:
return (
f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name}"
f"{reference.tokenized.mechanism} {reference.tokenized.ip.address}"
f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:{reference.tokenized.ip.address}"
f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}"
)


class DNSSPFMechanismHostname(DNSSPFMechanism):
object_type: Literal["DNSSPFMechanismHostname"] = "DNSSPFMechanismHostname"

hostname: Reference = ReferenceField(Hostname)
qualifier: MechanismQualifier = MechanismQualifier.ALLOW

_natural_key_attrs = ["spf_record", "mechanism", "hostname"]
_information_value = ["mechanism"]
_natural_key_attrs = ["spf_record", "mechanism", "hostname", "qualifier"]
_information_value = ["mechanism", "qualifier"]
_reverse_relation_names = {"spf_record": "spf_hostname_mechanisms"}

@classmethod
def format_reference_human_readable(cls, reference: Reference) -> str:
return (
f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name} "
f"{reference.tokenized.mechanism} {reference.tokenized.hostname.name}"
f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:{reference.tokenized.hostname.name}"
f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}"
)


class DNSSPFMechanismNetBlock(DNSSPFMechanism):
object_type: Literal["DNSSPFMechanismNetBlock"] = "DNSSPFMechanismNetBlock"

netblock: Reference = ReferenceField(NetBlock)
qualifier: MechanismQualifier = MechanismQualifier.ALLOW

_natural_key_attrs = ["spf_record", "mechanism", "netblock"]
_information_value = ["mechanism"]
_natural_key_attrs = ["spf_record", "mechanism", "netblock", "qualifier"]
_information_value = ["mechanism", "qualifier"]
_reverse_relation_names = {"spf_record": "spf_netblock_mechanisms"}

@classmethod
def format_reference_human_readable(cls, reference: Reference) -> str:
return (
f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name} "
f" {reference.tokenized.mechanism} {reference.tokenized.netblock.start_ip}"
f"/{reference.tokenized.netblock.mask}"
f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:"
f"{reference.tokenized.netblock.start_ip}/{reference.tokenized.netblock.mask}"
f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}"
)


Expand Down

0 comments on commit b9470e4

Please sign in to comment.