From 724fb392c03c8973e3e04a937e8fcd056b2674f5 Mon Sep 17 00:00:00 2001 From: JP Bruins Slot Date: Thu, 16 Jan 2025 13:10:21 +0100 Subject: [PATCH] Fix task stats --- mula/scheduler/app.py | 4 ---- mula/scheduler/server/handlers/tasks.py | 14 ++++---------- mula/scheduler/storage/stores/task.py | 14 ++++++++++++-- mula/tests/integration/test_api.py | 5 ++++- 4 files changed, 20 insertions(+), 17 deletions(-) diff --git a/mula/scheduler/app.py b/mula/scheduler/app.py index f1be55079bf..17a173d100d 100644 --- a/mula/scheduler/app.py +++ b/mula/scheduler/app.py @@ -152,10 +152,6 @@ def _unhandled_exception(self, args: threading.ExceptHookArgs) -> None: def _collect_metrics(self) -> None: """Collect application metrics throughout the application.""" - - # FIXME:: can be queries instead of a loop - # Collect the queue size of the schedulers, and the status counts of - # the tasks for each scheduler. for s in self.schedulers.values(): qsize = self.ctx.datastores.pq_store.qsize(s.scheduler_id) self.ctx.metrics_qsize.labels(scheduler_id=s.scheduler_id).set(qsize) diff --git a/mula/scheduler/server/handlers/tasks.py b/mula/scheduler/server/handlers/tasks.py index 46b6cc7469a..ac933085b8c 100644 --- a/mula/scheduler/server/handlers/tasks.py +++ b/mula/scheduler/server/handlers/tasks.py @@ -34,14 +34,6 @@ def __init__(self, api: fastapi.FastAPI, ctx: context.AppContext) -> None: description="Get task status counts for all schedulers in last 24 hours", ) - self.api.add_api_route( - path="/tasks/stats/{scheduler_id}", - endpoint=self.stats, - methods=["GET"], - status_code=status.HTTP_200_OK, - description="Get task status counts for a scheduler in last 24 hours", - ) - self.api.add_api_route( path="/tasks/{task_id}", endpoint=self.get, @@ -163,5 +155,7 @@ def patch(self, task_id: uuid.UUID, item: serializers.Task) -> Any: return updated_task - def stats(self, scheduler_id: str | None = None) -> dict[str, dict[str, int]] | None: - return self.ctx.datastores.task_store.get_status_count_per_hour(scheduler_id) + def stats( + self, scheduler_id: str | None = None, organisation_id: str | None = None + ) -> dict[str, dict[str, int]] | None: + return self.ctx.datastores.task_store.get_status_count_per_hour(scheduler_id, organisation_id) diff --git a/mula/scheduler/storage/stores/task.py b/mula/scheduler/storage/stores/task.py index 437e10ca538..d2a83adcfe8 100644 --- a/mula/scheduler/storage/stores/task.py +++ b/mula/scheduler/storage/stores/task.py @@ -136,7 +136,9 @@ def cancel_tasks(self, scheduler_id: str, task_ids: list[str]) -> None: @retry() @exception_handler - def get_status_count_per_hour(self, scheduler_id: str | None = None) -> dict[str, dict[str, int]] | None: + def get_status_count_per_hour( + self, scheduler_id: str | None = None, organisation_id: str | None = None + ) -> dict[str, dict[str, int]] | None: with self.dbconn.session.begin() as session: query = ( session.query( @@ -152,6 +154,9 @@ def get_status_count_per_hour(self, scheduler_id: str | None = None) -> dict[str if scheduler_id is not None: query = query.filter(models.TaskDB.scheduler_id == scheduler_id) + if organisation_id is not None: + query = query.filter(models.TaskDB.organisation == organisation_id) + results = query.all() response: dict[str, dict[str, int]] = {} @@ -166,7 +171,9 @@ def get_status_count_per_hour(self, scheduler_id: str | None = None) -> dict[str @retry() @exception_handler - def get_status_counts(self, scheduler_id: str | None = None) -> dict[str, int] | None: + def get_status_counts( + self, scheduler_id: str | None = None, organisation_id: str | None = None + ) -> dict[str, int] | None: with self.dbconn.session.begin() as session: query = ( session.query(models.TaskDB.status, func.count(models.TaskDB.id).label("count")) @@ -177,6 +184,9 @@ def get_status_counts(self, scheduler_id: str | None = None) -> dict[str, int] | if scheduler_id is not None: query = query.filter(models.TaskDB.scheduler_id == scheduler_id) + if organisation_id is not None: + query = query.filter(models.TaskDB.organisation == organisation_id) + results = query.all() response = {k.value: 0 for k in models.TaskStatus} diff --git a/mula/tests/integration/test_api.py b/mula/tests/integration/test_api.py index e234facd265..9f8f2d291f4 100644 --- a/mula/tests/integration/test_api.py +++ b/mula/tests/integration/test_api.py @@ -712,7 +712,10 @@ def test_get_tasks_stats(self): response = self.client.get("/tasks/stats") self.assertEqual(200, response.status_code) - response = self.client.get(f"/tasks/stats/{self.first_item_api.get('scheduler_id')}") + response = self.client.get(f"/tasks/stats?scheduler_id={self.first_item_api.get('scheduler_id')}") + self.assertEqual(200, response.status_code) + + response = self.client.get(f"/tasks/stats?organisation_id={self.first_item_api.get('organisation_id')}") self.assertEqual(200, response.status_code)