Skip to content

Commit

Permalink
Allow creation of declared scan profiles through normalizers (#2428)
Browse files Browse the repository at this point in the history
Signed-off-by: Donny Peeters <[email protected]>
Co-authored-by: Jan Klopper <[email protected]>
  • Loading branch information
Donnype and underdarknl authored Feb 6, 2024
1 parent 1b38990 commit c2e1785
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 31 deletions.
8 changes: 5 additions & 3 deletions boefjes/boefjes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
TaskStatus,
)
from boefjes.config import Settings
from boefjes.job_handler import BoefjeHandler, NormalizerHandler
from boefjes.job_handler import BoefjeHandler, NormalizerHandler, bytes_api_client
from boefjes.katalogus.local_repository import get_local_repository
from boefjes.local import LocalBoefjeJobRunner, LocalNormalizerJobRunner
from boefjes.runtime_interfaces import Handler, WorkerManager
Expand Down Expand Up @@ -254,9 +254,11 @@ def _start_working(
def get_runtime_manager(settings: Settings, queue: WorkerManager.Queue, log_level: str) -> WorkerManager:
local_repository = get_local_repository()
if queue is WorkerManager.Queue.BOEFJES:
item_handler = BoefjeHandler(LocalBoefjeJobRunner(local_repository), local_repository)
item_handler = BoefjeHandler(LocalBoefjeJobRunner(local_repository), local_repository, bytes_api_client)
else:
item_handler = NormalizerHandler(LocalNormalizerJobRunner(local_repository))
item_handler = NormalizerHandler(
LocalNormalizerJobRunner(local_repository), bytes_api_client, settings.scan_profile_whitelist
)

return SchedulerWorkerManager(
item_handler,
Expand Down
8 changes: 7 additions & 1 deletion boefjes/boefjes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from typing import Any, Dict, Optional, Tuple, Type

from pydantic import AmqpDsn, AnyHttpUrl, Field, FilePath, IPvAnyAddress, PostgresDsn
from pydantic import AmqpDsn, AnyHttpUrl, Field, FilePath, IPvAnyAddress, PostgresDsn, conint
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, SettingsConfigDict
from pydantic_settings.sources import EnvSettingsSource

Expand Down Expand Up @@ -52,6 +52,12 @@ class Settings(BaseSettings):
"1.1.1.1", description="Name server used for remote DNS resolution in the boefje runner"
)

scan_profile_whitelist: Dict[str, conint(strict=True, ge=0, le=4)] = Field(
default_factory=dict,
description="Whitelist for normalizer ids allowed to produce scan profiles, including a maximum level.",
examples=['{"kat_external_db_normalize": 3, "kat_dns_normalize": 1}'],
)

# Queue configuration
queue_uri: AmqpDsn = Field(..., description="KAT queue URI", examples=["amqp://"], validation_alias="QUEUE_URI")

Expand Down
72 changes: 57 additions & 15 deletions boefjes/boefjes/job_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import traceback
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

import requests
from pydantic.tools import parse_obj_as
Expand All @@ -16,13 +16,14 @@
BoefjeMeta,
NormalizerMeta,
NormalizerPlainOOI,
NormalizerScanProfile,
)
from boefjes.katalogus.local_repository import LocalPluginRepository
from boefjes.plugins.models import _default_mime_types
from boefjes.runtime_interfaces import BoefjeJobRunner, Handler, NormalizerJobRunner
from octopoes.api.models import Declaration, Observation
from octopoes.connector.octopoes import OctopoesAPIConnector
from octopoes.models import OOI, Reference
from octopoes.models import OOI, Reference, ScanProfile
from octopoes.models.exception import ObjectNotFoundException
from octopoes.models.types import OOIType

Expand Down Expand Up @@ -81,6 +82,10 @@ def serialize_ooi(ooi: OOI):
return serialized_oois


def get_octopoes_api_connector(org_code: str):
return OctopoesAPIConnector(str(settings.octopoes_api), org_code)


def get_environment_settings(boefje_meta: BoefjeMeta, environment_keys: List[str]) -> Dict[str, str]:
try:
katalogus_api = str(settings.katalogus_api).rstrip("/")
Expand All @@ -106,9 +111,17 @@ def get_environment_settings(boefje_meta: BoefjeMeta, environment_keys: List[str


class BoefjeHandler(Handler):
def __init__(self, job_runner, local_repository: LocalPluginRepository):
self.job_runner: BoefjeJobRunner = job_runner
self.local_repository: LocalPluginRepository = local_repository
def __init__(
self,
job_runner: BoefjeJobRunner,
local_repository: LocalPluginRepository,
bytes_client: BytesAPIClient,
octopoes_factory=get_octopoes_api_connector,
):
self.job_runner = job_runner
self.local_repository = local_repository
self.bytes_client = bytes_client
self.octopoes_factory = octopoes_factory

def handle(self, boefje_meta: BoefjeMeta) -> None:
logger.info("Handling boefje %s[task_id=%s]", boefje_meta.boefje.id, str(boefje_meta.id))
Expand All @@ -129,7 +142,7 @@ def handle(self, boefje_meta: BoefjeMeta) -> None:
boefje_meta.arguments["input"] = serialize_ooi(
_find_ooi_in_past(
Reference.from_str(boefje_meta.input_ooi),
get_octopoes_api_connector(boefje_meta.organization),
self.octopoes_factory(boefje_meta.organization),
)
)

Expand Down Expand Up @@ -157,11 +170,11 @@ def handle(self, boefje_meta: BoefjeMeta) -> None:
boefje_meta.ended_at = datetime.now(timezone.utc)
logger.info("Saving to Bytes for boefje %s[%s]", boefje_meta.boefje.id, str(boefje_meta.id))

bytes_api_client.save_boefje_meta(boefje_meta)
self.bytes_client.save_boefje_meta(boefje_meta)

if boefje_results:
for boefje_added_mime_types, output in boefje_results:
raw_file_id = bytes_api_client.save_raw(
raw_file_id = self.bytes_client.save_raw(
boefje_meta.id, output, mime_types.union(boefje_added_mime_types)
)
logger.debug(
Expand All @@ -172,19 +185,28 @@ def handle(self, boefje_meta: BoefjeMeta) -> None:


class NormalizerHandler(Handler):
def __init__(self, job_runner):
self.job_runner: NormalizerJobRunner = job_runner
def __init__(
self,
job_runner: NormalizerJobRunner,
bytes_client: BytesAPIClient,
octopoes_factory=get_octopoes_api_connector,
whitelist: Optional[Dict[str, int]] = None,
):
self.job_runner = job_runner
self.bytes_client: BytesAPIClient = bytes_client
self.octopoes_factory = octopoes_factory
self.whitelist = whitelist

def handle(self, normalizer_meta: NormalizerMeta) -> None:
logger.info("Handling normalizer %s[%s]", normalizer_meta.normalizer.id, normalizer_meta.id)

raw = bytes_api_client.get_raw(normalizer_meta.raw_data.id)
raw = self.bytes_client.get_raw(normalizer_meta.raw_data.id)

normalizer_meta.started_at = datetime.now(timezone.utc)

try:
results = self.job_runner.run(normalizer_meta, raw)
connector = get_octopoes_api_connector(normalizer_meta.raw_data.boefje_meta.organization)
connector = self.octopoes_factory(normalizer_meta.raw_data.boefje_meta.organization)

for observation in results.observations:
reference = Reference.from_str(observation.input_ooi)
Expand All @@ -207,16 +229,36 @@ def handle(self, normalizer_meta: NormalizerMeta) -> None:
valid_time=normalizer_meta.raw_data.boefje_meta.ended_at,
)
)

corrected_scan_profiles = []
for profile in results.scan_profiles:
profile.level = min(profile.level, self.whitelist.get(normalizer_meta.normalizer.id, profile.level))
corrected_scan_profiles.append(profile)

validated_scan_profiles = [
profile
for profile in corrected_scan_profiles
if self.whitelist and profile.level <= self.whitelist.get(normalizer_meta.normalizer.id, -1)
]
if validated_scan_profiles:
connector.save_many_scan_profiles(
[self._parse_scan_profile(scan_profile) for scan_profile in results.scan_profiles],
valid_time=normalizer_meta.raw_data.boefje_meta.ended_at,
)
finally:
normalizer_meta.ended_at = datetime.now(timezone.utc)
bytes_api_client.save_normalizer_meta(normalizer_meta)
self.bytes_client.save_normalizer_meta(normalizer_meta)

logger.info("Done with normalizer %s[%s]", normalizer_meta.normalizer.id, normalizer_meta.id)

@staticmethod
def _parse_ooi(result: NormalizerPlainOOI):
return parse_obj_as(OOIType, result.model_dump())

@staticmethod
def _parse_scan_profile(result: NormalizerScanProfile):
return parse_obj_as(ScanProfile, result.model_dump())

def get_octopoes_api_connector(org_code: str):
return OctopoesAPIConnector(str(settings.octopoes_api), org_code)

class InvalidWhitelist(Exception):
pass
8 changes: 7 additions & 1 deletion boefjes/boefjes/job_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,16 @@ class NormalizerDeclaration(BaseModel):
ooi: NormalizerPlainOOI


class NormalizerScanProfile(BaseModel):
scan_profile_type: str
model_config = ConfigDict(populate_by_name=True, extra="allow")


class NormalizerResult(BaseModel): # Moves all validation logic to Pydantic
item: Union[NormalizerPlainOOI, NormalizerObservation, NormalizerDeclaration]
item: Union[NormalizerPlainOOI, NormalizerObservation, NormalizerDeclaration, NormalizerScanProfile]


class NormalizerOutput(BaseModel):
observations: List[NormalizerObservation] = []
declarations: List[NormalizerDeclaration] = []
scan_profiles: List[NormalizerScanProfile] = []
8 changes: 5 additions & 3 deletions boefjes/boefjes/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
NormalizerOutput,
NormalizerPlainOOI,
NormalizerResult,
NormalizerScanProfile,
ObservationsWithoutInputOOI,
UnsupportedReturnTypeNormalizer,
)
from boefjes.katalogus.local_repository import LocalPluginRepository
from boefjes.runtime_interfaces import BoefjeJobRunner, JobRuntimeError, NormalizerJobRunner
from octopoes.models import OOI
from octopoes.models import OOI, DeclaredScanProfile

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -98,12 +99,13 @@ def _parse_results(self, normalizer_meta: NormalizerMeta, results: List[Any]) ->
return NormalizerOutput(
observations=observations,
declarations=[result.item for result in parsed if isinstance(result.item, NormalizerDeclaration)],
scan_profiles=[result.item for result in parsed if isinstance(result.item, NormalizerScanProfile)],
)

@staticmethod
def _parse(result: Any) -> NormalizerResult:
if not isinstance(result, dict): # Must be an OOI. This should be phased out together with Octopoes
if not isinstance(result, OOI):
if not isinstance(result, dict): # Must be an OOI or ScanProfile. Should be phased out with Octopoes dependency
if not isinstance(result, (OOI, DeclaredScanProfile)):
raise UnsupportedReturnTypeNormalizer(str(type(result)))

result = result.dict()
Expand Down
2 changes: 1 addition & 1 deletion boefjes/boefjes/plugins/kat_external_db/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Boefje script for getting domaions and ipaddresses from dadb"""
"""Boefje script for getting domains and ipaddresses from dadb"""
from os import getenv
from typing import List, Tuple, Union

Expand Down
13 changes: 10 additions & 3 deletions boefjes/boefjes/plugins/kat_external_db/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Iterator, Union

from boefjes.job_models import NormalizerMeta
from octopoes.models import OOI
from octopoes.models import OOI, DeclaredScanProfile
from octopoes.models.ooi.dns.zone import Hostname
from octopoes.models.ooi.network import IPAddressV4, IPAddressV6, IPV4NetBlock, IPV6NetBlock, Network

Expand Down Expand Up @@ -50,18 +50,25 @@ def run(normalizer_meta: NormalizerMeta, raw: Union[bytes, str]) -> Iterator[OOI

ip_address = address_type(address=address, network=network.reference)
yield ip_address
yield DeclaredScanProfile(reference=ip_address.reference, level=3)
addresses_count += 1

if mask < interface.ip.max_prefixlen:
yield block_type(
block = block_type(
start_ip=ip_address.reference,
mask=mask,
network=network.reference,
)
yield block
yield DeclaredScanProfile(reference=block.reference, level=3)
blocks_count += 1

for hostname in follow_path_in_dict(path=DOMAIN_LIST_PATH, path_dict=results):
yield Hostname(name=follow_path_in_dict(path=DOMAIN_ITEM_PATH, path_dict=hostname), network=network.reference)
hostname = Hostname(
name=follow_path_in_dict(path=DOMAIN_ITEM_PATH, path_dict=hostname), network=network.reference
)
yield hostname
yield DeclaredScanProfile(reference=hostname.reference, level=3)
hostnames_count += 1

logging.info(
Expand Down
25 changes: 25 additions & 0 deletions boefjes/tests/examples/external_db.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"id": "f50de284-d3c1-4f87-ba68-07b957b7a48f",
"raw_data": {
"id": "66451567-2381-4248-b5a1-d0d0985e065f",
"boefje_meta": {
"id": "f29f76c5-f7b1-4662-89c6-6dc313a9f93f",
"boefje": {
"id": "external_db"
},
"input_ooi": "Network|internet",
"arguments": {"input": {"name": "internet"}},
"organization": "tst",
"started_at": "2010-07-27T11:26:42.679000+00:00",
"ended_at": "2010-07-27T11:26:48.679000+00:00"
},
"mime_types": [
{
"value": "external_db"
}
]
},
"normalizer": {
"id": "kat_external_db_normalize"
}
}
Loading

0 comments on commit c2e1785

Please sign in to comment.