Skip to content

Commit

Permalink
Fix task stats
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbruinsslot committed Jan 16, 2025
1 parent b31c366 commit 724fb39
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 17 deletions.
4 changes: 0 additions & 4 deletions mula/scheduler/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,6 @@ def _unhandled_exception(self, args: threading.ExceptHookArgs) -> None:

def _collect_metrics(self) -> None:
"""Collect application metrics throughout the application."""

# FIXME:: can be queries instead of a loop
# Collect the queue size of the schedulers, and the status counts of
# the tasks for each scheduler.
for s in self.schedulers.values():
qsize = self.ctx.datastores.pq_store.qsize(s.scheduler_id)
self.ctx.metrics_qsize.labels(scheduler_id=s.scheduler_id).set(qsize)
Expand Down
14 changes: 4 additions & 10 deletions mula/scheduler/server/handlers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ def __init__(self, api: fastapi.FastAPI, ctx: context.AppContext) -> None:
description="Get task status counts for all schedulers in last 24 hours",
)

self.api.add_api_route(
path="/tasks/stats/{scheduler_id}",
endpoint=self.stats,
methods=["GET"],
status_code=status.HTTP_200_OK,
description="Get task status counts for a scheduler in last 24 hours",
)

self.api.add_api_route(
path="/tasks/{task_id}",
endpoint=self.get,
Expand Down Expand Up @@ -163,5 +155,7 @@ def patch(self, task_id: uuid.UUID, item: serializers.Task) -> Any:

return updated_task

def stats(self, scheduler_id: str | None = None) -> dict[str, dict[str, int]] | None:
return self.ctx.datastores.task_store.get_status_count_per_hour(scheduler_id)
def stats(
self, scheduler_id: str | None = None, organisation_id: str | None = None
) -> dict[str, dict[str, int]] | None:
return self.ctx.datastores.task_store.get_status_count_per_hour(scheduler_id, organisation_id)
14 changes: 12 additions & 2 deletions mula/scheduler/storage/stores/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ def cancel_tasks(self, scheduler_id: str, task_ids: list[str]) -> None:

@retry()
@exception_handler
def get_status_count_per_hour(self, scheduler_id: str | None = None) -> dict[str, dict[str, int]] | None:
def get_status_count_per_hour(
self, scheduler_id: str | None = None, organisation_id: str | None = None
) -> dict[str, dict[str, int]] | None:
with self.dbconn.session.begin() as session:
query = (
session.query(
Expand All @@ -152,6 +154,9 @@ def get_status_count_per_hour(self, scheduler_id: str | None = None) -> dict[str
if scheduler_id is not None:
query = query.filter(models.TaskDB.scheduler_id == scheduler_id)

if organisation_id is not None:
query = query.filter(models.TaskDB.organisation == organisation_id)

results = query.all()

response: dict[str, dict[str, int]] = {}
Expand All @@ -166,7 +171,9 @@ def get_status_count_per_hour(self, scheduler_id: str | None = None) -> dict[str

@retry()
@exception_handler
def get_status_counts(self, scheduler_id: str | None = None) -> dict[str, int] | None:
def get_status_counts(
self, scheduler_id: str | None = None, organisation_id: str | None = None
) -> dict[str, int] | None:
with self.dbconn.session.begin() as session:
query = (
session.query(models.TaskDB.status, func.count(models.TaskDB.id).label("count"))
Expand All @@ -177,6 +184,9 @@ def get_status_counts(self, scheduler_id: str | None = None) -> dict[str, int] |
if scheduler_id is not None:
query = query.filter(models.TaskDB.scheduler_id == scheduler_id)

if organisation_id is not None:
query = query.filter(models.TaskDB.organisation == organisation_id)

results = query.all()

response = {k.value: 0 for k in models.TaskStatus}
Expand Down
5 changes: 4 additions & 1 deletion mula/tests/integration/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,10 @@ def test_get_tasks_stats(self):
response = self.client.get("/tasks/stats")
self.assertEqual(200, response.status_code)

response = self.client.get(f"/tasks/stats/{self.first_item_api.get('scheduler_id')}")
response = self.client.get(f"/tasks/stats?scheduler_id={self.first_item_api.get('scheduler_id')}")
self.assertEqual(200, response.status_code)

response = self.client.get(f"/tasks/stats?organisation_id={self.first_item_api.get('organisation_id')}")
self.assertEqual(200, response.status_code)


Expand Down

0 comments on commit 724fb39

Please sign in to comment.