Writing Celery Tasks¶
The primary function of the Triggers framework is to execute Celery tasks.
For the most part, these look the same as any other Celery tasks, with two notable differences:
- The tasks must extend
triggers.task.TriggerTask
. - Override the
_run
method instead ofrun
(note the leading underscore).
As an example, consider the following trigger task:
trigger_manager.update_configuration({
't_importSubject': {
'after': ['firstPageReceived', 'questionnaireComplete'],
'run': 'app.tasks.ImportSubject',
},
...
})
The idea here is that the ImportSubject
Celery task takes data from the
first page of response data and creates a Subject
record in the database.
The application will help the Celery task by attaching the response data to the
firstPageReceived
trigger when it fires:
def first_page_responses(request):
"""
Django view that processes the first page of response data
from the client.
"""
responses_form = QuestionnaireResponsesForm(request.POST)
if responses.is_valid():
...
trigger_manager.fire(
trigger_name = 'firstPageReceived',
trigger_kwargs = {'responses': responses.cleaned_data},
)
...
Note that when the firstpageReceived
trigger is fired, the response data are
attached via trigger_kwargs
.
Here’s what the ImportSubject
Celery task might look like:
from my_app.models import Subject
from triggers.task import TaskContext, TriggerTask
class ImportSubject(TriggerTask):
def _run(self, context):
# type: (TaskContext) -> dict
# Load kwargs provided when the ``firstPageReceived``
# trigger was fired by the application.
page_data =\
context.trigger_kwargs['firstPageReceived']['responses']
# Create a new ``subject`` record.
new_subject =\
Subject.objects.create(
birthday = page_data['birthday'],
name = page_data['name'],
)
# Make the PK value accessible to tasks that are
# waiting for a cascade.
return {
'subjectId': new_subject.pk,
}
The ImportSubject
task’s _run
method (note the leading underscore) does
3 things:
- Load the response data from the
firstPageReceived
trigger kwargs. - Import the data into a new
Subject
record. - Return the resulting ID value so that when the task cascades, other tasks will be able to use it (more on this later).
Task Context¶
The only argument passed to the _run
method is a
triggers.task.TaskContext
object.
The TaskContext
provides everything that your task will need to
interact with the Triggers framework infrastructure:
Trigger Manager¶
context.manager
is a trigger manager instance that you can leverage in your
task to interact with the Triggers framework. For example, you can use
context.manager
to fire additional triggers as your task runs.
Trigger Kwargs¶
As noted above, whenever the application fires a trigger, it can attach optional kwargs to that trigger.
These kwargs are then made available to your task in two ways:
context.trigger_kwargs
returns the raw kwargs for each trigger that caused your task to run.context.filter_kwargs()
uses the Filters library to validate and transform thetrigger_kwargs
.
The above example shows how to use context.trigger_kwargs
. Here is an
alternate approach that uses context.filter_kwargs()
instead:
import filters as f
class ImportSubject(TriggerTask):
def _run(self, context):
# type: (TaskContext) -> dict
filtered_kwargs =\
context.filter_kwargs({
'firstPageReceived': {
'responses':
f.Required
| f.Type(dict)
| f.FilterMapper({
'birthday': f.Required | f.Date,
'name': f.Required | f.Unicode,
}),
},
})
page_data = filtered_kwargs['firstPageReceived']['responses']
...
Note
If you have worked with FilterMappers in the past, the above structure should look very familiar.
Cascading¶
When the Celery task finishes successfully, the trigger manager will cause a “cascade” by firing the corresponding trigger task’s name as a trigger.
For example, consider the trigger task from earlier:
trigger_manager.update_configuration({
't_importSubject': {
'after': ['firstPageReceived', 'questionnaireComplete'],
'run': 'app.tasks.ImportSubject',
},
...
})
The trigger task is named t_importSubject
, so when the ImportSubject
Celery task finishes, the trigger manager will automatically fire a trigger
named t_importSubject
.
But, what kwargs are attached to this trigger?
If the Celery task returns a mapping (e.g., dict), then that will be used as the kwargs for the cascading trigger.
Going back to the ImportSubject
example:
class ImportSubject(TriggerTask):
def _run(self, context):
...
# Make the PK value accessible to tasks that are
# waiting for a cascade.
return {
'subjectId': new_subject.pk,
}
When this task finishes, the trigger manager will cascade like this:
trigger_manager.fire(
trigger_name = 't_importSubject',
trigger_kwargs = {'subjectId': new_subject.pk},
)
Logging¶
If your Celery task needs to use a logger, consider using
context.get_logger_context()
.
The logger instance returned by this method includes a few features that integrate closely with the trigger manager.
See Logging for more information.
Retrying¶
To retry a Celery task mid-execution, the method looks similar to a regular Celery task:
class ImportBrowserMetadata(TriggerTask):
# Specify the max number of retries allowed.
max_retries = 3
def _run(self, context):
# type: (TaskContext) -> dict
...
try:
# Try to load data from 3rd-party API...
metadata = api_client.post(...)
except HttpError as e:
# ... but if we are unable to connect,
# retry after a delay.
raise self.retry(exc=e, cooldown=10)
Note that this retry mechanism works a little differently from Celery’s retry:
- You must
raise self.retry()
; it won’t raise the exception for you. - Use
cooldown
instead ofcountdown
.eta
is not supported. - If desired, you can also specify replacement trigger kwargs to use when retrying the task.
If the Celery task exceeds its max_retries
, then it will raise a
triggers.task.MaxRetriesExceeded
.