-
-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support success event for batched tasks #33
Comments
class SimpleRequest
when compared ~celery.worker.request.Request
class SimpleRequest
when compared ~celery.worker.request.Request
class SimpleRequest
compared to ~celery.worker.request.Request
You're welcome!
I recently tried to make this a bit clearer, but the main difference is that It is probably missing a few properties as those aren't necessary to actually execute a task. I'm not sure it really makes sense for this class to have any methods on it though.
If there's "simple" properties missing we should just add them, that should be straightforward (which one in particular do you need?)
For the We do fire some events already: celery-batches/celery_batches/__init__.py Lines 305 to 309 in 29d5f56
Celery seems to fire this by calling But I'm unsure where the appropriate place to fire success is. Some options spring to mind, but there might be other:
Sorry for the long answer! I need to look up again what those events mean, but I think one of those is a reasonable option. 😄 |
@clokep , Thanks for your informative answer. Currently I'm calling But I'm also checking the celery code to know how it is calling this event with appropriate data (like task runtime, received_time, start_time etc..). |
When you say "currently" are you using Celery batches now?
Great. 👍 Hopefully it isn't too hard to make it work. |
class SimpleRequest
compared to ~celery.worker.request.Request
(I've also retitled this, hopefully it captures what you were going for.) |
Hi @clokep , I've checked the celery code related to the following events during the task life-cycle.
so to emit these events for the tasks which we were currently processed in the batches, I've done the following updates to task class
# emits task received signal
from celery import signals
def Strategy(self, task, app, consumer):
def task_message_handler(message, body, ack, reject, callbacks, \*\*kw):
events = eventer and eventer.enabled
send_event = eventer and eventer.send
task_sends_events = events and task.send_events
signals.task_received.send(sender=consumer, request=request) # emits task received signal
if task_sends_events:
send_event(
"task-received",
uuid=request.id,
name=request.name,
args=request.argsrepr,
kwargs=request.kwargsrepr,
root_id=request.root_id,
parent_id=request.parent_id,
retries=request.request_dict.get("retries", 0),
eta=request.eta and request.eta.isoformat(),
expires=request.expires and request.expires.isoformat(),
)
def flush(self, requests):
def on_accepted(pid, time_accepted):
[req.acknowledge() for req in acks_late[False]]
for req in requests:
req.start_time = time_accepted
req.send_event("task-started") # emits request started event
from time import monotonic
from billiard.einfo import ExceptionInfo
def flush(self, requests):
def on_return(results):
""" Handler for the returned value from the task function(i.e consumer of batched requests) """
if isinstance(results, (list,)):
[req.acknowledge() for req in acks_late[True]]
for index, result in enumerate(results):
request = requests[index] # input request, which was a part batch
request_runtime = monotonic() - request.start_time
if isinstance(result, (Exception)):
type_, _, tb = type(result), result, result.__traceback__
# Using billiard's exception wrapper utility, so to make the exceptions raised by task function
# to be compatable with celery's `on_failure` handler
excep = ExceptionInfo((type_, _, tb))
failed__retval__runtime = (1, excep, request_runtime)
else:
failed__retval__runtime = (0, result, request_runtime)
# emits request success event, which will be propagated by celery
request.on_success(failed__retval__runtime)
@clokep , can you please review and give the feedback, so I can update the code as per the suggestions. |
@srinivaas14 The Received and Started changes look pretty reasonable, I'm a bit more skeptical about the Success/Failure ones though since I don't think we require that any particular results be returned from the task. It would probably be easier to look at a pull request to really see the changes (and let CI run). |
Hi @clokep , thanks for the comments.
These Success/Failure events are for the batched requests that we are executing in our task. These events will be useful to track the state and result of the each request (which is a part of batched request list) in the monitoring tools like Flower. Otherwise request will be in 'Started' state even after we save (i.e
Sure, I'll raise a pull request with the above mentioned changes |
Right, I think it is good to send the events, but I think the implementation given above won't work with the way |
…ed requests - The above mentioned events will be useful to track the state of each request (which is a part of batched request list) in the monitoring tools like Flower issue ref: clokep#33
Hi @clokep , Yes you are correct related to the Success/Failure events implementation. Here I'm assuming, that the task will return an ordered list of results. As you mentioned, I'll try to re-implement this to be compatible with the current Apart from this , I've raised a pull request, which has a feature for Let me know If any further info is required |
HI @clokep , thanks for the package. I'm interested to know a bit more about the
class SimpleRequest
. As mentioned in the doc string forclass SimpleRequest
(SimpleRequest generally should have the same properties as :class:~celery.worker.request.Request
), but in the codeclass SimpleRequest
has only the subset of properties , compared to the class:~celery.worker.request.Request
.So if I want to use some of the methods (ex. send_event ) or properties of the class:
~celery.worker.request.Request
, what will be the suggested/best way that I can do with theclass SimpleRequest
My use case : I want to send the
send_event('task-succeeded')
after processing and saving(in result backend) the result of the each request(which I received from the list of batched requests), so Flower (monitoring tool) or any other event listener can mark that the request was successfully processed. Other wise the request is in pending state only.The text was updated successfully, but these errors were encountered: