Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support current_task for batch tasks #172

Open
wants to merge 1 commit into
base: batch-task-log-iterator
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions tasktiger/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,3 @@ def tasktiger_processor(logger, method_name, event_dict):
event_dict['task_id'] = g['current_batch_task'].id

return event_dict


def batch_param_iterator(params):
"""
Helper to set current batch task.

This helper should be used in conjunction with tasktiger_processor
to facilitate logging of task ids.
"""
for i, p in enumerate(params):
g['current_batch_task'] = g['current_tasks'][i]
yield p
g['current_batch_task'] = None
8 changes: 6 additions & 2 deletions tasktiger/tasktiger.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,12 @@ def init(self, connection=None, config=None, setup_structlog=False):
def _get_current_task(self):
if g['current_tasks'] is None:
raise RuntimeError('Must be accessed from within a task')
if g['current_task_is_batch']:
raise RuntimeError('Must use current_tasks in a batch task.')

if g['current_task_is_batch'] and g['current_batch_task']:
return g['current_batch_task']
elif g['current_task_is_batch'] and not g['current_batch_task']:
raise RuntimeError('Must use batch_param_iterator in batch task.')

return g['current_tasks'][0]

def _get_current_tasks(self):
Expand Down
14 changes: 14 additions & 0 deletions tasktiger/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from ._internal import g


def batch_param_iterator(params):
"""
Helper to set current batch task.

This helper should be used in conjunction with tasktiger_processor
to facilitate logging of task ids.
"""
for i, p in enumerate(params):
g['current_batch_task'] = g['current_tasks'][i]
yield p
g['current_batch_task'] = None
14 changes: 11 additions & 3 deletions tests/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
from math import ceil
import time

import pytest
import redis

from tasktiger import RetryException
from tasktiger import RetryException, g
from tasktiger.retry import fixed
from tasktiger.utils import batch_param_iterator

from .config import DELAY, TEST_DB, REDIS_HOST
from .utils import get_tiger
Expand Down Expand Up @@ -139,18 +141,24 @@ def verify_current_task():


@tiger.task(batch=True, queue='batch')
def verify_current_tasks(tasks):
def verify_current_tasks(params):
with redis.Redis(
host=REDIS_HOST, db=TEST_DB, decode_responses=True
) as conn:
try:
tasks = tiger.current_task
tiger.current_task
except RuntimeError:
# This is expected (we need to use current_tasks)

tasks = tiger.current_tasks
conn.rpush('task_ids', *[t.id for t in tasks])

for i, p in enumerate(batch_param_iterator(params)):
assert tiger.current_task.id == g['current_tasks'][i].id

with pytest.raises(RuntimeError):
tiger.current_task.id


@tiger.task()
def sleep_task(delay=10):
Expand Down
3 changes: 2 additions & 1 deletion tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import structlog

from tasktiger import TaskTiger, Worker, g
from tasktiger.logging import tasktiger_processor, batch_param_iterator
from tasktiger.logging import tasktiger_processor
from tasktiger.utils import batch_param_iterator

from .test_base import BaseTestCase
from .utils import get_tiger, get_redis
Expand Down