Skip to content

Commit

Permalink
Merge pull request #202 from mobiusml/task_queue_timezone_bug
Browse files Browse the repository at this point in the history
Use Timezone Consistently in Task Repository
  • Loading branch information
movchan74 authored Nov 13, 2024
2 parents 165a7e3 + e0c7379 commit 50673f8
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
19 changes: 11 additions & 8 deletions aana/storage/repository/task.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import UUID

Expand Down Expand Up @@ -137,7 +137,6 @@ def fetch_unprocessed_tasks(
.with_for_update(skip_locked=True)
.all()
)

for task in tasks:
self.update_status(
task_id=task.id,
Expand Down Expand Up @@ -167,9 +166,9 @@ def update_status(
"""
task = self.read(task_id)
if status == TaskStatus.COMPLETED or status == TaskStatus.FAILED:
task.completed_at = datetime.now() # noqa: DTZ005
task.completed_at = datetime.now(timezone.utc)
if status == TaskStatus.ASSIGNED:
task.assigned_at = datetime.now() # noqa: DTZ005
task.assigned_at = datetime.now(timezone.utc)
task.num_retries += 1
if progress is not None:
task.progress = progress
Expand Down Expand Up @@ -249,8 +248,12 @@ def update_expired_tasks(
Returns:
list[TaskEntity]: the expired tasks.
"""
timeout_cutoff = datetime.now() - timedelta(seconds=execution_timeout) # noqa: DTZ005
heartbeat_cutoff = datetime.now() - timedelta(seconds=heartbeat_timeout) # noqa: DTZ005
timeout_cutoff = datetime.now(timezone.utc) - timedelta(
seconds=execution_timeout
)
heartbeat_cutoff = datetime.now(timezone.utc) - timedelta(
seconds=heartbeat_timeout
)
tasks = (
self.session.query(TaskEntity)
.filter(
Expand All @@ -268,7 +271,7 @@ def update_expired_tasks(
)
for task in tasks:
if task.num_retries >= max_retries:
if task.assigned_at <= timeout_cutoff:
if task.assigned_at.astimezone(timezone.utc) <= timeout_cutoff:
result = {
"error": "TimeoutError",
"message": (
Expand Down Expand Up @@ -313,7 +316,7 @@ def heartbeat(self, task_ids: list[str] | set[str]):
for task_id in task_ids
]
self.session.query(TaskEntity).filter(TaskEntity.id.in_(task_ids)).update(
{TaskEntity.updated_at: datetime.now()}, # noqa: DTZ005
{TaskEntity.updated_at: datetime.now(timezone.utc)},
synchronize_session=False,
)
self.session.commit()
6 changes: 3 additions & 3 deletions aana/tests/db/datastore/test_task_repo.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ruff: noqa: S101

import asyncio
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

import pytest

Expand Down Expand Up @@ -51,7 +51,7 @@ def _create_sample_tasks():
db_session.commit()

# Create sample tasks with different statuses
now = datetime.now() # noqa: DTZ005
now = datetime.now(timezone.utc)

task1 = TaskEntity(
endpoint="/test1",
Expand Down Expand Up @@ -291,7 +291,7 @@ def test_update_expired_tasks(db_session):
db_session.commit()

# Set up current time and a cutoff time
current_time = datetime.now() # noqa: DTZ005
current_time = datetime.now(timezone.utc)
execution_timeout = 3600 # 1 hour in seconds
heartbeat_timeout = 60 # 1 minute in seconds

Expand Down

0 comments on commit 50673f8

Please sign in to comment.