Task Runners¶
When the trigger manager determines that a task instance is ready to run, it instantiates a runner to handle the execution.
By default, this runner uses Celery
(triggers.runners.CeleryTaskRunner
), but you can customize this.
For example, during unit tests, the trigger
manager will use triggers.runners.ThreadingTaskRunner
instead.
CeleryTaskRunner¶
As the name implies, CeleryTaskRunner
executes task instances using
Celery.
Each trigger task is implemented as a Celery task, and when the
trigger manager schedules a task instance for execution, the
CeleryTaskRunner
will schedule a matching Celery task for execution.
Tip
You can leverage Celery’s router to send tasks to different queues, just like regular Celery tasks.
ThreadingTaskRunner¶
ThreadingTaskRunner
operates completely independently from Celery.
Instead of sending tasks to the Celery broker, it executes each task in a
separate thread.
Generally, this runner is only used during testing, but in certain cases, it may be useful to utilize this runner in other contexts.
Tip
If you need your application to wait for all running tasks to finish before
continuing, invoke ThreadingTaskRunner.join_all()
.
Note that this will wait for all running tasks to finish (including any cascades that may occur).
It is not possible (nor in line with the philosophy of the triggers framework) to wait for a particular task to finish before continuing. If you need certain logic to run after a particular task finishes, it is recommended that you implement that logic as a separate task that is triggered by a cascade from the first task.
Writing Your Own Task Runner¶
As with Trigger Managers and Storage Backends, you can inject your own task runners into the Triggers framework.
Anatomy of a Task Runner¶
A task runner must extend triggers.runners.BaseTaskRunner
. The base
class declares the following attributes/methods that you must implement in your
custom task runner:
name: Text
A unique identifier for your task runner.
Generally this matches the name of the task runner’s entry point in your project’s
setup.py
file (see below).run(self, manager: TriggerManager, task_instance: TaskInstance) -> NoReturn
Given a trigger manager and task instance, finds the correct Celery task (i.e., using the
resolve()
method) and executes it.Tip
See
ThreadingTaskRunner.run()
for a sample implementation.
Registering Your Task Runner¶
As with trigger managers, you must register your custom task runner before it can be used.
To do this, define a triggers.runners
entry point in your project’s
setup.py
file:
from setuptools import setup
setup(
...
entry_points = {
'triggers.runners': [
'custom_runner = app.triggers:CustomRunner',
],
},
)
Tip
Any time you make changes to setup.py
, you must reinstall your project
(e.g., by running pip install -e .
again) before the changes will take
effect.
Using Your Task Runner¶
Unlike Trigger Managers and Storage Backends, your application does not select the task runner directly.
Instead, the task runner is configured via one of two methods (in descending order of priority):
In the trigger task’s using clause.
Add your custom task runner to each task’s configuration:
from app.triggers import CustomRunner trigger_manager.update_configuration({ 't_importSubject': { ... 'using': CustomRunner.name, }, ... })
Tip
This approach is useful if you only want some of your tasks to use the custom task runner (whereas the rest should use e.g., the default
CeleryTaskRunner
).Via the trigger manager’s
default_task_runner_name
property.In order for this to work correctly, you must subclass
TriggerManager
:class CustomTriggerManager(TriggerManager): name = 'custom' default_task_runner_name = CustomRunner.name trigger_manager = CustomTriggerManager(...) tirgger_manager.fire(...)
Important
Don’t forget to register your custom trigger manager!