Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbruinsslot committed Jan 14, 2025
1 parent 46c3a38 commit af19161
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 85 deletions.
45 changes: 19 additions & 26 deletions boefjes/boefjes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,44 +81,37 @@ def run(self, queue_type: WorkerManager.Queue) -> None:
raise

def _fill_queue(self, task_queue: Queue, queue_type: WorkerManager.Queue) -> None:
if task_queue.qsize() > self.settings.pool_size:
time.sleep(self.settings.worker_heartbeat)
return

logger.debug("Popping from queue %s", queue_type.value)

try:
response = self.scheduler_client.pop_item(queue_type.value)
p_item = self.scheduler_client.pop_item(queue_type.value)
except (HTTPError, ValidationError):
logger.exception("Popping task from scheduler failed, sleeping 10 seconds")
time.sleep(10)
time.sleep(self.settings.worker_heartbeat)
return

# TODO: check
if not response:
logger.debug("Queue %s empty", queue_type.value)
time.sleep(10)
if p_item is None:
time.sleep(self.settings.worker_heartbeat)
return

# TODO: check
if response.count == 0:
logger.debug("Queue %s empty", queue_type.value)
time.sleep(10)
return
logger.info("Handling task[%s]", p_item.data.id)

for p_item in response.results:
logger.info("Handling task[%s]", p_item.data.id)
try:
task_queue.put(p_item)
logger.info("Dispatched task[%s]", p_item.data.id)
except: # noqa
logger.exception("Exiting worker...")
logger.info("Patching scheduler task[id=%s] to %s", p_item.data.id, TaskStatus.FAILED.value)

try:
task_queue.put(p_item)
logger.info("Dispatched task[%s]", p_item.data.id)
except: # noqa
logger.exception("Exiting worker...")
logger.info("Patching scheduler task[id=%s] to %s", p_item.data.id, TaskStatus.FAILED.value)

try:
self.scheduler_client.patch_task(p_item.id, TaskStatus.FAILED)
logger.info(
"Set task status to %s in the scheduler for task[id=%s]", TaskStatus.FAILED, p_item.data.id
)
except HTTPError:
logger.exception("Could not patch scheduler task to %s", TaskStatus.FAILED.value)
self.scheduler_client.patch_task(p_item.id, TaskStatus.FAILED)
logger.info("Set task status to %s in the scheduler for task[id=%s]", TaskStatus.FAILED, p_item.data.id)
except HTTPError:
logger.exception("Could not patch scheduler task to %s", TaskStatus.FAILED.value)

def _check_workers(self) -> None:
new_workers = []
Expand Down
13 changes: 4 additions & 9 deletions boefjes/boefjes/clients/scheduler_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class SchedulerClientInterface:
def get_queues(self) -> list[Queue]:
raise NotImplementedError()

def pop_item(self, scheduler_id: str) -> PaginatedTasksResponse | None:
def pop_item(self, scheduler_id: str) -> Task | None:
raise NotImplementedError()

def pop_items(self, scheduler_id: str, filters: dict[str, Any]) -> PaginatedTasksResponse | None:
Expand All @@ -78,17 +78,12 @@ def __init__(self, base_url: str):
def _verify_response(response: Response) -> None:
response.raise_for_status()

def get_queues(self) -> list[Queue]:
response = self._session.get("/queues")
self._verify_response(response)

return TypeAdapter(list[Queue]).validate_json(response.content)

def pop_item(self, scheduler_id: str) -> PaginatedTasksResponse | None:
def pop_item(self, scheduler_id: str) -> Task | None:
response = self._session.post(f"/schedulers/{scheduler_id}/pop?limit=1")
self._verify_response(response)

return TypeAdapter(PaginatedTasksResponse | None).validate_json(response.content)
page = TypeAdapter(PaginatedTasksResponse | None).validate_json(response.content)
return TypeAdapter(Task | None).validate_json(page.results[0]) if page else None

def pop_items(self, scheduler_id: str, filters: dict[str, Any]) -> PaginatedTasksResponse | None:
response = self._session.post(f"/schedulers/{scheduler_id}/pop", json=filters)
Expand Down
88 changes: 47 additions & 41 deletions boefjes/tests/examples/scheduler/should_crash.json
Original file line number Diff line number Diff line change
@@ -1,43 +1,49 @@
{
"id": "9071c9fd-2b9f-440f-a524-ef1ca4824fd4",
"priority": 1,
"scheduler_id": "boefje-_dev",
"schedule_id": null,
"status": "dispatched",
"type": "boefje",
"hash": "7e698c377cfd85015c0d7086b76b76b4",
"data": {
"id": "9071c9fd-2b9f-440f-a524-ef1ca4824fd4",
"boefje": {
"id": "dns-records",
"name": "DnsRecords",
"description": "Fetch the DNS record(s) of a hostname",
"version": null,
"scan_level": 1,
"consumes": [
"Hostname"
],
"produces": [
"DNSAAAARecord",
"IPAddressV6",
"NXDOMAIN",
"Hostname",
"Network",
"DNSNSRecord",
"DNSTXTRecord",
"IPAddressV4",
"DNSMXRecord",
"DNSZone",
"DNSARecord",
"DNSSOARecord",
"DNSCNAMERecord"
],
"dispatches": null
},
"input_ooi": "Hostname|internet|test.test",
"organization": "_dev",
"dispatches": []
},
"created_at": "2021-06-29T14:00:00",
"modified_at": "2021-06-29T14:00:00"
"count": 1,
"next": null,
"previous": null,
"results": [
{
"id": "9071c9fd-2b9f-440f-a524-ef1ca4824fd4",
"priority": 1,
"scheduler_id": "boefje",
"organisation": "_dev",
"schedule_id": null,
"status": "dispatched",
"type": "boefje",
"hash": "7e698c377cfd85015c0d7086b76b76b4",
"data": {
"id": "9071c9fd-2b9f-440f-a524-ef1ca4824fd4",
"boefje": {
"id": "dns-records",
"name": "DnsRecords",
"description": "Fetch the DNS record(s) of a hostname",
"version": null,
"scan_level": 1,
"consumes": ["Hostname"],
"produces": [
"DNSAAAARecord",
"IPAddressV6",
"NXDOMAIN",
"Hostname",
"Network",
"DNSNSRecord",
"DNSTXTRecord",
"IPAddressV4",
"DNSMXRecord",
"DNSZone",
"DNSARecord",
"DNSSOARecord",
"DNSCNAMERecord"
],
"dispatches": null
},
"input_ooi": "Hostname|internet|test.test",
"organization": "_dev",
"dispatches": []
},
"created_at": "2021-06-29T14:00:00",
"modified_at": "2021-06-29T14:00:00"
}
]
}
15 changes: 6 additions & 9 deletions boefjes/tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,7 @@ def test_two_processes_cleanup_unfinished_tasks(
"""

manager.scheduler_client = MockSchedulerClient(
get_dummy_data("scheduler/queues_response.json"),
3 * [get_dummy_data("scheduler/pop_response_boefje.json")],
[],
tmp_path / "patch_task_log",
3 * [get_dummy_data("scheduler/pop_response_boefje.json")], [], tmp_path / "patch_task_log"
)
manager.settings.pool_size = 2
manager.task_queue = Manager().Queue()
Expand All @@ -151,10 +148,11 @@ def test_two_processes_cleanup_unfinished_tasks(
}

# Tasks (one with the same id) was still unhandled the queue and pushed back to the scheduler by the main process
assert manager.scheduler_client._pushed_items["70da7d4f-f41f-4940-901b-d98a92e9014b"].scheduler_id == "boefje-_dev"
assert json.loads(
manager.scheduler_client._pushed_items["70da7d4f-f41f-4940-901b-d98a92e9014b"].json()
) == json.loads(get_dummy_data("scheduler/pop_response_boefje.json"))
assert manager.scheduler_client._pushed_items["70da7d4f-f41f-4940-901b-d98a92e9014b"].scheduler_id == "boefje"
assert (
json.loads(manager.scheduler_client._pushed_items["70da7d4f-f41f-4940-901b-d98a92e9014b"].json())
== json.loads(get_dummy_data("scheduler/pop_response_boefje.json")).get("results")[0]
)


def test_normalizer_queue(manager: SchedulerWorkerManager, item_handler: MockHandler) -> None:
Expand All @@ -168,7 +166,6 @@ def test_normalizer_queue(manager: SchedulerWorkerManager, item_handler: MockHan

def test_null(manager: SchedulerWorkerManager, tmp_path: Path, item_handler: MockHandler):
manager.scheduler_client = MockSchedulerClient(
get_dummy_data("scheduler/queues_response.json"),
3 * [get_dummy_data("scheduler/pop_response_boefje.json")],
[get_dummy_data("scheduler/pop_response_normalizer.json")],
tmp_path / "patch_task_log",
Expand Down

0 comments on commit af19161

Please sign in to comment.