Skip to content

Commit

Permalink
add task scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
jefer94 committed Mar 23, 2024
1 parent 7af94e2 commit c81625b
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 72 deletions.
75 changes: 75 additions & 0 deletions .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions

name: Check

on:
push: {}
pull_request: {}

env:
PYTHON_VERSION: 3.11.8
PYTHONUNBUFFERED: 1

jobs:
tests:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: "pip"

- name: Install dependencies
run: |
pip install hatch
- name: Run tests
run: |
hatch run test
- uses: codecov/codecov-action@v4
if: ${{ github.event_name == 'pull_request' || github.repository == 'breatheco-de/apiv2' }}
with:
token: ${{ secrets.CODECOV_TOKEN }} # not required for public repos
files: ./coverage.xml # optional
flags: unittests # optional
name: codecov-umbrella # optional
fail_ci_if_error: true # optional (default = false)
verbose: true # optional (default = false)

- name: Upload coverage data to coveralls.io
if: ${{ github.event_name == 'pull_request' || github.repository == 'breatheco-de/apiv2' }}
run: |
hatch run coveralls --service=github
env:
GITHUB_TOKEN: ${{ github.token }}

pages:
if: >-
github.repository == 'breatheco-de/linked-services-django-plugin' &&
github.event_name == 'push' &&
github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: "pip"

- name: Install dependencies
run: |
pip install hatch
- name: Deploy docs
run: hatch run mkdocs gh-deploy --force
9 changes: 7 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,22 @@ license = "LGPL-3.0-or-later"
keywords = []
authors = [{ name = "jefer94", email = "[email protected]" }]
classifiers = [
"Development Status :: 4 - Beta",
"Development Status :: 5 - Production/Stable",
"Programming Language :: Python",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
"Framework :: Django :: 4.2",
"Framework :: Django :: 5.0",
"License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)",
"Topic :: Software Development :: Libraries",
]
dependencies = ["celery"]

[project.urls]
Documentation = "https://github.com/breatheco-de/celery-task-manager-django-plugin#readme"
Documentation = "https://breatheco-de.github.io/celery-task-manager-django-plugin/"
Issues = "https://github.com/breatheco-de/celery-task-manager-django-plugin/issues"
Source = "https://github.com/breatheco-de/celery-task-manager-django-plugin"

Expand Down
2 changes: 1 addition & 1 deletion src/task_manager/__about__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2024-present jefer94 <[email protected]>
#
# SPDX-License-Identifier: LGPL-3.0-or-later
__version__ = "1.1.1"
__version__ = "1.2.0"
44 changes: 44 additions & 0 deletions src/task_manager/core/actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import inspect
from datetime import datetime
from decimal import Decimal
from typing import Callable


def parse_payload(payload: dict):
if not isinstance(payload, dict):
return payload

for key in payload.keys():
# TypeError("string indices must be integers, not 'str'")
if isinstance(payload[key], datetime):
payload[key] = payload[key].isoformat().replace("+00:00", "Z")

elif isinstance(payload[key], Decimal):
payload[key] = str(payload[key])

elif isinstance(payload[key], list) or isinstance(payload[key], tuple) or isinstance(payload[key], set):
array = []
for item in payload[key]:
array.append(parse_payload(item))

payload[key] = array

elif isinstance(payload[key], dict):
payload[key] = parse_payload(payload[key])

return payload


def get_fn_desc(function: Callable) -> tuple[str, str] | tuple[None, None]:
if not function:
return None, None

if hasattr(function, "__module__"):
module_name = function.__module__

else:
module_name = inspect.getmodule(function).__name__

function_name = function.__name__

return module_name, function_name
46 changes: 4 additions & 42 deletions src/task_manager/core/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import logging
import traceback
from datetime import datetime
from decimal import Decimal
from typing import Any, Callable, Optional

import celery
from django.utils import timezone

from task_manager.core.actions import get_fn_desc, parse_payload

from .exceptions import AbortTask, ProgrammingError, RetryTask
from .settings import settings

Expand All @@ -27,31 +28,6 @@ class CircuitBreakerError(Exception):
pass


def parse_payload(payload: dict):
if not isinstance(payload, dict):
return payload

for key in payload.keys():
# TypeError("string indices must be integers, not 'str'")
if isinstance(payload[key], datetime):
payload[key] = payload[key].isoformat().replace("+00:00", "Z")

elif isinstance(payload[key], Decimal):
payload[key] = str(payload[key])

elif isinstance(payload[key], list) or isinstance(payload[key], tuple) or isinstance(payload[key], set):
array = []
for item in payload[key]:
array.append(parse_payload(item))

payload[key] = array

elif isinstance(payload[key], dict):
payload[key] = parse_payload(payload[key])

return payload


class TaskManager:
current_page: Optional[int]
total_pages: Optional[int]
Expand Down Expand Up @@ -98,20 +74,6 @@ def __init__(self, *args, **kwargs):

self.parent_decorator = celery.shared_task(*args, **kwargs)

def get_fn_desc(self, function: Callable) -> tuple[str, str] | tuple[None, None]:
if not function:
return None, None

if hasattr(function, "__module__"):
module_name = function.__module__

else:
module_name = inspect.getmodule(function).__name__

function_name = function.__name__

return module_name, function_name

def _get_fn(self, task_module: str, task_name: str) -> Callable | None:
module = importlib.import_module(task_module)
return getattr(module, task_name, None)
Expand Down Expand Up @@ -175,8 +137,8 @@ def __call__(self, function):

@functools.wraps(function)
def wrapper(*args, **kwargs):
task_module, task_name = self.get_fn_desc(function)
reverse_module, reverse_name = self.get_fn_desc(self.reverse)
task_module, task_name = get_fn_desc(function)
reverse_module, reverse_name = get_fn_desc(self.reverse)
arguments = parse_payload(
{
"args": args[1:] if self.bind else args,
Expand Down
76 changes: 76 additions & 0 deletions src/task_manager/django/actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from datetime import UTC, datetime
from typing import Any, Callable

from dateutil.relativedelta import relativedelta

from task_manager.core.actions import get_fn_desc, parse_payload
from task_manager.django.models import ScheduledTask

delta_units = {
"s": lambda n: relativedelta(seconds=n),
"m": lambda n: relativedelta(minutes=n),
"h": lambda n: relativedelta(hours=n),
"d": lambda n: relativedelta(days=n),
"w": lambda n: relativedelta(weeks=n),
}


def schedule_task(task: Callable, eta: str) -> Callable[..., None]:
"""
Schedule a task.
```py
# You should declare globally this handler
schedule_my_task_1d = schedule_task(my_task, '1d')
# Then, you should use it in a function
schedule_my_task_1d(1, 2, 3, name='my_task')
```
```py
```
"""

if callable(task) is False:
raise ValueError("Task must be callable")

if hasattr(task, "delay") is False:
raise ValueError("Task must be a Celery task")

number = eta[:-1]
unit = eta[-1]

if number.isnumeric() is False:
raise ValueError("ETA value must be a number.")

handler = delta_units.get(unit, None)
if handler is None:
raise ValueError(f"ETA unit must be one of {', '.join(delta_units.keys())}.")

module_name, function_name = get_fn_desc(task)

def create_schedule_instance(*args: Any, **kwargs: Any):
delta = handler(number)
now = datetime.now(tz=UTC)

arguments = parse_payload(
{
"args": args,
"kwargs": kwargs,
}
)

ScheduledTask.objects.create(
task_module=module_name,
task_name=function_name,
arguments=arguments,
duration=delta,
eta=now + delta,
)

return create_schedule_instance


aaa = schedule_task()

aaa()
Loading

0 comments on commit c81625b

Please sign in to comment.