Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port internetnl bit → nibble #4026

Merged
merged 23 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion octopoes/.ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ services:
args:
ENVIRONMENT: dev
context: .
command: pytest tests/integration --timeout=300
command: pytest -s tests/integration/test_internetnl_nibble.py --timeout=300
originalsouth marked this conversation as resolved.
Show resolved Hide resolved
depends_on:
- xtdb
- ci_octopoes
Expand Down
15 changes: 0 additions & 15 deletions octopoes/bits/https_availability/bit.py

This file was deleted.

21 changes: 0 additions & 21 deletions octopoes/bits/https_availability/https_availability.py

This file was deleted.

24 changes: 0 additions & 24 deletions octopoes/bits/internetnl/bit.py

This file was deleted.

50 changes: 0 additions & 50 deletions octopoes/bits/internetnl/internetnl.py

This file was deleted.

15 changes: 14 additions & 1 deletion octopoes/nibbles/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@


class NibbleParameter(BaseModel):
object_type: type[Any]
object_type: Any
parser: str = "[]"
optional: bool = False
additional: set[type[OOI]] = set()

def __eq__(self, other):
if isinstance(other, NibbleParameter):
Expand All @@ -31,12 +32,20 @@ def __eq__(self, other):
else:
return False

@property
def triggers(self) -> set[type[OOI]]:
if isinstance(self.object_type, type) and issubclass(self.object_type, OOI):
return {self.object_type} | self.additional
else:
return self.additional


class NibbleDefinition(BaseModel):
id: str
signature: list[NibbleParameter]
query: str | Callable[[list[Reference | None]], str] | None = None
enabled: bool = True
additional: set[type[OOI]] = set()
_payload: MethodType | None = None
_checksum: str | None = None

Expand All @@ -53,6 +62,10 @@ def __hash__(self):
def _ini(self) -> dict[str, Any]:
return {"id": self.id, "enabled": self.enabled, "checksum": self._checksum}

@property
def triggers(self) -> set[type[OOI]]:
return set.union(*[sgn.triggers for sgn in self.signature]) | self.additional


def get_nibble_definitions() -> dict[str, NibbleDefinition]:
nibble_definitions = {}
Expand Down
2 changes: 1 addition & 1 deletion octopoes/nibbles/disallowed_csp_hostnames/nibble.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def query(targets: list[Reference | None]) -> str:


NIBBLE = NibbleDefinition(
id="disallowed-csp-hostnames",
id="disallowed_csp_hostnames",
signature=[
NibbleParameter(object_type=HTTPHeaderHostname, parser="[*][?object_type == 'HTTPHeaderHostname'][]"),
NibbleParameter(object_type=Config, parser="[*][?object_type == 'Config'][]", optional=True),
Expand Down
19 changes: 19 additions & 0 deletions octopoes/nibbles/https_availability/https_availability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from collections.abc import Iterator

from octopoes.models import OOI
from octopoes.models.ooi.findings import Finding, KATFindingType
from octopoes.models.ooi.network import IPAddress, IPPort
from octopoes.models.ooi.web import Website


def nibble(ipaddress: IPAddress, ipport80: IPPort, website: Website, port443s: int) -> Iterator[OOI]:
_ = ipaddress
_ = ipport80
if port443s < 1:
ft = KATFindingType(id="KAT-HTTPS-NOT-AVAILABLE")
yield ft
yield Finding(
ooi=website.reference,
finding_type=ft.reference,
description="HTTP port is open, but HTTPS port is not open",
)
66 changes: 66 additions & 0 deletions octopoes/nibbles/https_availability/nibble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from nibbles.definitions import NibbleDefinition, NibbleParameter
from octopoes.models import Reference
from octopoes.models.ooi.network import IPAddress, IPPort
from octopoes.models.ooi.web import Website


def query(targets: list[Reference | None]) -> str:
def pull(statements: list[str]) -> str:
return f"""
{{
:query {{
:find [(pull ?ipaddress [*]) (pull ?ipport80 [*]) (pull ?website [*]) (- (count ?ipport443) 1)]
:where [
{" ".join(statements)}
]
}}
}}
"""

base_query = [
"""
[?website :Website/ip_service ?ip_service]
[?ipservice :IPService/ip_port ?ipport80]
[?ipport80 :IPPort/port 80]
[?ipport80 :IPPort/address ?ipaddress]
(or
(and [?ipport443 :IPPort/address ?ipaddress][?ipport443 :IPPort/port 443])
[(identity nil) ?ipport443]
)
"""
]

ref_queries = [
f'[?ipaddress :IPAddress/primary_key "{str(targets[0])}"]',
f'[?ipport80 :IPPort/primary_key "{str(targets[1])}"]',
f'[?website :Website/primary_key "{str(targets[2])}"]',
]

sgn = "".join(str(int(isinstance(target, Reference))) for target in targets)
if sgn == "1000":
return pull(ref_queries[0:1] + base_query)
elif sgn == "0100":
if int(str(targets[1]).split("|")[-1]) == 80:
return pull(ref_queries[1:2] + base_query)
else:
return pull(base_query)
elif sgn == "0010":
return pull(ref_queries[2:3] + base_query)
elif sgn == "1110":
return pull(ref_queries + base_query)
else:
return pull(base_query)


NIBBLE = NibbleDefinition(
id="https_availability",
signature=[
NibbleParameter(
object_type=IPAddress, parser="[*][?object_type == 'IPAddressV6' || object_type == 'IPAddressV4'][]"
),
NibbleParameter(object_type=IPPort, parser="[*][?object_type == 'IPPort'][]"),
NibbleParameter(object_type=Website, parser="[*][?object_type == 'Website'][]"),
NibbleParameter(object_type=int, parser="[*][-1][]"),
],
query=query,
)
18 changes: 18 additions & 0 deletions octopoes/nibbles/internetnl/internetnl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from collections.abc import Iterator

from octopoes.models import OOI
from octopoes.models.ooi.dns.zone import Hostname
from octopoes.models.ooi.findings import Finding, KATFindingType


def nibble(hostname: Hostname, findings: list[Finding]) -> Iterator[OOI]:
result = "\n".join([str(finding.description) for finding in findings])

if result:
ft = KATFindingType(id="KAT-INTERNETNL")
yield ft
yield Finding(
finding_type=ft.reference,
ooi=hostname.reference,
description=f"This hostname has at least one website with the following finding(s): {result}",
)
101 changes: 101 additions & 0 deletions octopoes/nibbles/internetnl/nibble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from nibbles.definitions import NibbleDefinition, NibbleParameter
from octopoes.models import Reference
from octopoes.models.ooi.dns.zone import Hostname, Network
from octopoes.models.ooi.findings import Finding

finding_types = [
"KAT-WEBSERVER-NO-IPV6",
"KAT-NAMESERVER-NO-TWO-IPV6",
"KAT-NO-DNSSEC",
"KAT-INVALID-DNSSEC",
"KAT-NO-HSTS",
"KAT-NO-CSP",
"KAT-NO-X-FRAME-OPTIONS",
"KAT-NO-X-CONTENT-TYPE-OPTIONS",
"KAT-CSP-VULNERABILITIES",
"KAT-HSTS-VULNERABILITIES",
"KAT-NO-CERTIFICATE",
"KAT-HTTPS-NOT-AVAILABLE",
"KAT-SSL-CERT-HOSTNAME-MISMATCH",
"KAT-HTTPS-REDIRECT",
]


def or_finding_types() -> str:
clauses = "".join([f'[?finding :Finding/finding_type "{ft}"]' for ft in finding_types])
return f"(or {clauses})"


def query(targets: list[Reference | None]) -> str:
def pull(statements: list[str]) -> str:
return f"""
{{
:query {{
:find [
(pull ?hostname [*])
(pull ?website [*])
(pull ?finding [*])
]
:where [
{" ".join(statements)}
]
}}
}}
"""

base_query = [
"""
[?hostname :object_type "Hostname"]
[?website :Website/hostname ?hostname]
(or-join [?finding]
[?finding :Finding/ooi ?hostname]
(and
[?hostnamehttpurl :HostnameHTTPURL/netloc ?hostname]
[?finding :Finding/ooi ?hostnamehttpurl]
)
[?finding :Finding/ooi ?website]
(and
[?resource :HTTPResource/website ?website]
[?finding :Finding/ooi ?resource]
)
(and
[?header :HTTPHeader/resource ?resource]
[?resource :HTTPResource/website ?website]
[?finding :Finding/ooi ?header]
)
)
"""
]

sgn = "".join(str(int(isinstance(target, Reference))) for target in targets)
ref_query = ["[?hostname :Hostname/primary_key]"]
if sgn == "100":
ref_query = [f'[?hostname :Hostname/primary_key "{str(targets[0])}"]']
elif sgn == "010":
ref_query = [f'[?hostname :Hostname/primary_key "{str(targets[1]).split("|")[-1]}"]']
elif sgn == "001":
tokens = str(targets[2]).split("|")[1:-1]
target_reference = Reference.from_str("|".join(tokens))
if tokens[0] == "Hostname":
hostname = target_reference.tokenized
elif tokens[0] in {"HostnameHTTPURL", "Website"}:
hostname = target_reference.tokenized.hostname
elif tokens[0] == "HTTPResource":
hostname = target_reference.tokenized.website.hostname
elif tokens[0] == "HTTPHeader":
hostname = target_reference.tokenized.resource.website.hostname
else:
raise ValueError()
hostname_pk = Hostname(name=hostname.name, network=Network(name=hostname.network.name).reference).reference
ref_query = [f'[?hostname :Hostname/primary_key "{str(hostname_pk)}"]']
return pull(ref_query + base_query)


NIBBLE = NibbleDefinition(
id="internet_nl",
signature=[
NibbleParameter(object_type=Hostname, parser="[*][?object_type == 'Hostname'][]"),
NibbleParameter(object_type=list[Finding], parser="[[*][?object_type == 'Finding'][]]", additional={Finding}),
],
query=query,
)
Loading
Loading