Trigger Managers

The trigger manager acts as the controller for the Triggers framework. Working in conjunction with a storage backend, it provides an interface for effecting changes on a triggers session.

From the Basic Concepts documentation, you can see that initializing a trigger manager is fairly straightforward:

from triggers import TriggerManager

trigger_manager = TriggerManager(storage_backend)

Where storage_backend is a storage backend.

Interacting with Trigger Managers

Trigger managers provide the following methods:

update_configuration(configuration)()
Initializes or updates the trigger task configuration. See Writing Celery Tasks for more information.
fire(trigger_name, [trigger_kwargs])()
Fires a trigger. See Getting Started for more information on how to use this method.
replay_failed_instance(failed_instance, [replacement_kwargs])()

Given a failed task instance, creates a copy and attempts to run it. If desired, you can provide replacement kwargs, if the original task failed due to an invalid kwarg value.

See Replaying Failed Task Instances for more information.

Note

As the name implies, only failed instances can be replayed.

skip_failed_instance(failed_instance, [cascade], [result])()

Given a failed task instance, marks the instance as skipped, so that it is considered to be resolved.

If desired, you may also specify a fake result for the task instance, to trigger a cascade.

See Skipping Failed Task Instances for more information.

Note

As the name implies, only failed instances can be skipped.

update_instance_status(task_instance, status, [metadata], [cascade], [cascade_kwargs])()
Manually changes the status for a task instance. This method can also be used to trigger a cascade.
update_instance_metadata(task_instance, metadata)()
Manually update the metadata for a task instance. This method can be used to attach arbitrary data to a task instance for logging/troubleshooting purposes.
mark_instance_logs_resolved(task_instance)()
Given a task instance, updates its metadata so that its log messages are resolved.

Writing Custom Trigger Managers

You can customize the behavior of the trigger manager(s) that your application interacts with.

For example, you can write a custom trigger manager that contains additional logic to finalize sessions.

Your trigger manager must extend the triggers.manager.TriggerManager class.

There is only one attribute that must be implemented in order to create a custom trigger manager:

name: Text

A unique identifier for your trigger manager.

Generally this matches the name of the trigger manager’s entry point in your project’s setup.py file (see below).

Hooks

Whenever the base trigger manager completes certain actions, it invokes a corresponding hook, which you can override in your custom trigger manager.

The following hooks are supported:

_post_fire(trigger_name, tasks_scheduled)()
Invoked after processing a call to fire(). It receives the name of the trigger that was fired, and a list of any task instances that were scheduled to run as a result.
_post_replay(task_instance)()

Invoked after processing a call to replay_failed_instance(). It receives the replayed task instance.

Tip

You can find the failed instance by inspecting the replayed instance’s metadata and extracting the parent item:

def _post_replay(task_instance)
  # type: (TaskInstance) -> NoReturn
  parent_name = task_instance.metadata['parent']  # type: Text
  parent_instance = self.storage[parent_name]  # type: TaskInstance
_post_skip(task_instance, cascade)()

Invoked after processing a call to skip_failed_instance(). It receives the skipped task instance, and a boolean indicating whether a cascade was simulated.

Note

This method gets invoked after the cascade happens (i.e., after _post_fire() is invoked).

Registering Your Trigger Manager

Because of the way trigger tasks work, you must register your custom trigger manager in order for it to work correctly.

To do this, you must create a custom entry point.

In your project’s setup.py file, add a triggers.managers entry point for your custom trigger manager.

For example, if you wanted to register app.triggers.CustomManager, you would add the following to your project’s setup.py file:

from setuptools import setup

setup(
  ...

  entry_points = {
    'triggers.managers': [
      'custom_manager = app.triggers:CustomManager',
    ],
  },
)

Tip

Any time you make changes to setup.py, you must reinstall your project (e.g., by running pip install -e . again) before the changes will take effect.

Once you’ve registered your trigger manager, you can then use it in your application:

from app.triggers import CustomManager
from triggers import CacheStorageBackend

trigger_manager =\
  CustomManager(CacheStorageBackend(session_uid))

Important

Make sure that your application always uses the same trigger manager (unless you are 110% sure you know what you are doing).