Skip to content

Commit

Permalink
add proccesing in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
jefer94 committed Jun 6, 2024
1 parent 870ad1a commit 333c848
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 31 deletions.
15 changes: 11 additions & 4 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit"
},
"black-formatter.args": ["--line-length=120"]
"black-formatter.args": [
"--line-length=120"
]
},
"isort.args": ["--profile", "black"],
"black-formatter.args": ["--line-length=120"]
}
"isort.args": [
"--profile",
"black"
],
"black-formatter.args": [
"--line-length=120"
]
}
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dependencies = [
"pytest-celery",
"celery[pytest]",
"pytest-asyncio",
"asgiref",
]
[tool.hatch.envs.default.scripts]
test = "pytest {args:tests} --nomigrations --durations=1"
Expand Down
93 changes: 66 additions & 27 deletions src/task_manager/management/commands/task_manager.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import importlib
from datetime import datetime, timedelta
from typing import Any

from celery import Task

# from breathecode.notify.actions import send_email_message
from django.core.management.base import BaseCommand
from django.db.models import Q
from django.utils import timezone
from django.db.models import QuerySet

from task_manager.django import tasks
from task_manager.django.models import ScheduledTask
Expand All @@ -28,6 +30,11 @@
"ERROR",
]

LIMITS = {
"small": 100,
"medium": 1000,
}


def is_report_time():
# Getting the current datetime
Expand Down Expand Up @@ -56,18 +63,29 @@ def handle(self, *args, **options):
# self.daily_report()
self.run_scheduled_tasks()

def delete_items(self, qs: QuerySet[Any]):
limit = LIMITS["medium"]

while True:
to_delete = qs[0:limit]

if to_delete.count() == 0:
break

to_delete.delete()

def clean_older_tasks(self):

date_limit = self.utc_now - timedelta(days=5)
errors = TaskManager.objects.filter(created_at__lt=date_limit, status="ERROR")

date_limit = self.utc_now - timedelta(days=2)
alright = TaskManager.objects.filter(created_at__lt=date_limit).exclude(status="ERROR")
alright = TaskManager.objects.filter(created_at__lt=date_limit).exclude(Q(status="ERROR") | Q(status="PENDING"))

count_errors = errors.count()
count_alright = alright.count()
errors.delete()
alright.delete()
self.delete_items(errors)
self.delete_items(alright)

self.stdout.write(self.style.SUCCESS(f"Successfully deleted {str(count_errors + count_alright)} TaskManager's"))

Expand All @@ -78,44 +96,65 @@ def run_scheduled_tasks(self):
cancelled_tasks = ScheduledTask.objects.exclude(status="PENDING")
scheduled = 0

for scheduled_task in scheduled_tasks:
if scheduled_task.task_module not in cache:
cache[scheduled_task.task_module] = {}
limit = LIMITS["small"]

while True:
scheduled_tasks = ScheduledTask.objects.filter(status="PENDING", eta__lte=self.utc_now)[:limit]
if scheduled_tasks.count() == 0:
break

for scheduled_task in scheduled_tasks:
if scheduled_task.task_module not in cache:
cache[scheduled_task.task_module] = {}

if scheduled_task.task_name not in cache[scheduled_task.task_module]:
if scheduled_task.task_module not in modules:
modules[scheduled_task.task_module] = importlib.import_module(scheduled_task.task_module)
if scheduled_task.task_name not in cache[scheduled_task.task_module]:
if scheduled_task.task_module not in modules:
modules[scheduled_task.task_module] = importlib.import_module(scheduled_task.task_module)

module = modules[scheduled_task.task_module]
function = getattr(module, scheduled_task.task_name)
cache[scheduled_task.task_module][scheduled_task.task_name] = function
module = modules[scheduled_task.task_module]
function = getattr(module, scheduled_task.task_name)
cache[scheduled_task.task_module][scheduled_task.task_name] = function

handler = cache[scheduled_task.task_module][scheduled_task.task_name]
handler.delay(*scheduled_task.arguments["args"], **scheduled_task.arguments["kwargs"])
scheduled += 1
handler = cache[scheduled_task.task_module][scheduled_task.task_name]
handler.delay(*scheduled_task.arguments["args"], **scheduled_task.arguments["kwargs"])
scheduled += 1

scheduled_tasks.delete()
cancelled_tasks.delete()
scheduled_tasks.delete()

self.delete_items(cancelled_tasks)

self.stdout.write(self.style.SUCCESS(f"Successfully scheduled {scheduled} Tasks"))

def rerun_pending_tasks(self):
tolerance = timedelta(minutes=TOLERANCE)

ids = TaskManager.objects.filter(last_run__lt=self.utc_now - tolerance, status="PENDING").values_list(
"id", flat=True
)
task_managers = TaskManager.objects.filter(last_run__lt=self.utc_now - tolerance, status="PENDING")
if count := task_managers.count() == 0:
msg = self.style.SUCCESS("No TaskManager's available to re-run")
self.stdout.write(self.style.SUCCESS(msg))
return

for id in ids:
tasks.mark_task_as_pending.delay(id, force=True)
limit = LIMITS["small"]
a = 0
page = 0

if ids:
msg = self.style.SUCCESS(f"Rerunning TaskManager's {', '.join([str(id) for id in ids])}")
while True:
if a >= count:
break

else:
msg = self.style.SUCCESS("No TaskManager's available to re-run")
a = page * limit
page += 1
b = page * limit

ids = task_managers[a:b].values_list("id", flat=True)

for id in ids:
tasks.mark_task_as_pending.delay(id, force=True)

if ids:
msg = self.style.SUCCESS(f"Rerunning TaskManager's {', '.join([str(id) for id in ids])}")

self.stdout.write(self.style.SUCCESS(msg))
self.stdout.write(self.style.SUCCESS(msg))

def daily_report(self):
if not is_report_time():
Expand Down

0 comments on commit 333c848

Please sign in to comment.