From 333c848b6f4682b8ed457beb1a385cf4ba52e167 Mon Sep 17 00:00:00 2001 From: jefer94 Date: Wed, 5 Jun 2024 21:56:22 -0500 Subject: [PATCH] add proccesing in batch --- .vscode/settings.json | 15 ++- pyproject.toml | 1 + .../management/commands/task_manager.py | 93 +++++++++++++------ 3 files changed, 78 insertions(+), 31 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index f08f5ba..7c0cfce 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -9,8 +9,15 @@ "editor.codeActionsOnSave": { "source.organizeImports": "explicit" }, - "black-formatter.args": ["--line-length=120"] + "black-formatter.args": [ + "--line-length=120" + ] }, - "isort.args": ["--profile", "black"], - "black-formatter.args": ["--line-length=120"] -} + "isort.args": [ + "--profile", + "black" + ], + "black-formatter.args": [ + "--line-length=120" + ] +} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 9342382..df9c45b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ dependencies = [ "pytest-celery", "celery[pytest]", "pytest-asyncio", + "asgiref", ] [tool.hatch.envs.default.scripts] test = "pytest {args:tests} --nomigrations --durations=1" diff --git a/src/task_manager/management/commands/task_manager.py b/src/task_manager/management/commands/task_manager.py index 20d9a1a..4e5bb0b 100644 --- a/src/task_manager/management/commands/task_manager.py +++ b/src/task_manager/management/commands/task_manager.py @@ -1,5 +1,6 @@ import importlib from datetime import datetime, timedelta +from typing import Any from celery import Task @@ -7,6 +8,7 @@ from django.core.management.base import BaseCommand from django.db.models import Q from django.utils import timezone +from django.db.models import QuerySet from task_manager.django import tasks from task_manager.django.models import ScheduledTask @@ -28,6 +30,11 @@ "ERROR", ] +LIMITS = { + "small": 100, + "medium": 1000, +} + def is_report_time(): # Getting the current datetime @@ -56,18 +63,29 @@ def handle(self, *args, **options): # self.daily_report() self.run_scheduled_tasks() + def delete_items(self, qs: QuerySet[Any]): + limit = LIMITS["medium"] + + while True: + to_delete = qs[0:limit] + + if to_delete.count() == 0: + break + + to_delete.delete() + def clean_older_tasks(self): date_limit = self.utc_now - timedelta(days=5) errors = TaskManager.objects.filter(created_at__lt=date_limit, status="ERROR") date_limit = self.utc_now - timedelta(days=2) - alright = TaskManager.objects.filter(created_at__lt=date_limit).exclude(status="ERROR") + alright = TaskManager.objects.filter(created_at__lt=date_limit).exclude(Q(status="ERROR") | Q(status="PENDING")) count_errors = errors.count() count_alright = alright.count() - errors.delete() - alright.delete() + self.delete_items(errors) + self.delete_items(alright) self.stdout.write(self.style.SUCCESS(f"Successfully deleted {str(count_errors + count_alright)} TaskManager's")) @@ -78,44 +96,65 @@ def run_scheduled_tasks(self): cancelled_tasks = ScheduledTask.objects.exclude(status="PENDING") scheduled = 0 - for scheduled_task in scheduled_tasks: - if scheduled_task.task_module not in cache: - cache[scheduled_task.task_module] = {} + limit = LIMITS["small"] + + while True: + scheduled_tasks = ScheduledTask.objects.filter(status="PENDING", eta__lte=self.utc_now)[:limit] + if scheduled_tasks.count() == 0: + break + + for scheduled_task in scheduled_tasks: + if scheduled_task.task_module not in cache: + cache[scheduled_task.task_module] = {} - if scheduled_task.task_name not in cache[scheduled_task.task_module]: - if scheduled_task.task_module not in modules: - modules[scheduled_task.task_module] = importlib.import_module(scheduled_task.task_module) + if scheduled_task.task_name not in cache[scheduled_task.task_module]: + if scheduled_task.task_module not in modules: + modules[scheduled_task.task_module] = importlib.import_module(scheduled_task.task_module) - module = modules[scheduled_task.task_module] - function = getattr(module, scheduled_task.task_name) - cache[scheduled_task.task_module][scheduled_task.task_name] = function + module = modules[scheduled_task.task_module] + function = getattr(module, scheduled_task.task_name) + cache[scheduled_task.task_module][scheduled_task.task_name] = function - handler = cache[scheduled_task.task_module][scheduled_task.task_name] - handler.delay(*scheduled_task.arguments["args"], **scheduled_task.arguments["kwargs"]) - scheduled += 1 + handler = cache[scheduled_task.task_module][scheduled_task.task_name] + handler.delay(*scheduled_task.arguments["args"], **scheduled_task.arguments["kwargs"]) + scheduled += 1 - scheduled_tasks.delete() - cancelled_tasks.delete() + scheduled_tasks.delete() + + self.delete_items(cancelled_tasks) self.stdout.write(self.style.SUCCESS(f"Successfully scheduled {scheduled} Tasks")) def rerun_pending_tasks(self): tolerance = timedelta(minutes=TOLERANCE) - ids = TaskManager.objects.filter(last_run__lt=self.utc_now - tolerance, status="PENDING").values_list( - "id", flat=True - ) + task_managers = TaskManager.objects.filter(last_run__lt=self.utc_now - tolerance, status="PENDING") + if count := task_managers.count() == 0: + msg = self.style.SUCCESS("No TaskManager's available to re-run") + self.stdout.write(self.style.SUCCESS(msg)) + return - for id in ids: - tasks.mark_task_as_pending.delay(id, force=True) + limit = LIMITS["small"] + a = 0 + page = 0 - if ids: - msg = self.style.SUCCESS(f"Rerunning TaskManager's {', '.join([str(id) for id in ids])}") + while True: + if a >= count: + break - else: - msg = self.style.SUCCESS("No TaskManager's available to re-run") + a = page * limit + page += 1 + b = page * limit + + ids = task_managers[a:b].values_list("id", flat=True) + + for id in ids: + tasks.mark_task_as_pending.delay(id, force=True) + + if ids: + msg = self.style.SUCCESS(f"Rerunning TaskManager's {', '.join([str(id) for id in ids])}") - self.stdout.write(self.style.SUCCESS(msg)) + self.stdout.write(self.style.SUCCESS(msg)) def daily_report(self): if not is_report_time():