Task Runners

This page walks you through how to write a TaskRunner, which is base class that performs a computational task in Blast. There are two types of TaskRunner: (1) a TransientTaskRunner, which performs a computation on a transient in the Blast database (e.g., matching a host galaxy), and (2) a SystemTaskRunner which performs a system level task not related to a specific transient (e.g, ingest a batch of transients from TNS or clean data directories).

Writing your own TaskRunner in Blast should be straightforward if you inherit from TransientTaskRunner or SystemTaskRunner. These classes handle the house keeping associated with running Blast task, allowing you to just write the task code.

This pages explains how to write your own TransientTaskRunner and SystemTaskRunner.

Transient Task

A new TransientTaskRunner should be implemented in the app/host/transient_tasks.py module. We will now explain how to implement a TransientTaskRunner.

Process method

The _run_process method is where your task’s code should go. This method should contain all the necessary computations and saves to the database for your task to be completed. It takes a Transient object as an argument and must return a status message which indicates the status of the task after computation. As an example, let’s implement a simple task that just prints ‘processing’ and then returns the processed status message.

def _run_process(transient):
    print('processing')
    return = "processed"

Note

The available status messages can be found in app/host/fixtures/initial/setup_status.yaml. The _run_process method must return a string that matches the message field of one of the statuses in app/host/fixtures/initial/setup_status.yaml. If you want to use a new status add it to app/host/fixtures/initial/setup_status.yaml

Task name

The TaskRunner needs to specify which task it operates on. This is done through implementing the task_name property. This methods takes no arguments and returns a string which is the name of the task. Let’s say we are implementing a TransientTaskRunner that matches a transient to a host galaxy, this TransientTaskRunner will alter the status of the “Host match” Task,

@property
def task_name():
    return 'Host match'

Note

You need to add your new task and its name into app/host/fixtures/initial/setup_tasks.yaml making sure the return of task_name matches the name field in the fixture. This will ensure Blast registers your task on start up.

Prerequisites

The TaskRunner needs to also specify which tasks need to be completed before it should be run. This is done through implementing the _prerequisites method. This function tasks no arguments and should return a dictionary with the name and status of prerequisite tasks. For example, if before running your task you need the Host match task to have status “not processed” and the Cutout download task to have status “processed”, it would look like this.

def _prerequisites():
    return {'Host match': 'not processed', 'Cutout download': 'processed'}

This ensures that your TaskRunner will only run on transients in the Blast database meeting the prerequisites.

Note

The available tasks can be found in app/host/fixtures/initial/setup_tasks.yaml. The _prerequisites method must return a dictionary with keys that match the name field of one of the tasks in app/host/fixtures/initial/setup_tasks.yaml and values that match a status app/host/fixtures/initial/setup_status.yaml.

Failed Status

You can specify what status happens if your _run_process code throws and exception and fails. This is done by implementing the _failed_status_message method. This method takes no arguments and returns a string which is the message of the failed status. Let’s say we want the failed status to be the Status with the message “failed”,

def _failed_status_message()
    return "failed"

If you do not implement this method it will default to a “failed” status.

Note

The available status messages can be found in app/host/fixtures/initial/setup_status.yaml. The _failed_status_message method must return a string that matches the message field of one of the statuses in app/host/fixtures/initial/setup_status.yaml. If you want to use a new status add it to app/host/fixtures/initial/setup_status.yaml

Task Frequency

You can specify the frequency at which as task should be run Blast by implementing the task_frequency_seconds property. This function must return a positive integer. If you do not implement this method, it will default to 60 seconds.

@property
def task_frequency_seconds(self):
    return 60

Run on start up

You can specify whether your task runs periodically on start up of Blast or needs to be explicitly triggered from the Django admin by implementing the task_initially_enabled property. If you do not implement this method it will default to true, meaning that the task will launch automatically on startup.

@property
def task_initially_enabled(self):
    """Will the task be run on start up"""
    return True

Full example class

Putting this all together, the example TransientTaskRunner class would be,

from .base_tasks import TransientTaskRunner

class ExampleTaskRunner(TransientTaskRunner):
    """An Example :code:`TaskRunner`"""

    def _run_process(transient):
        print('processing')
        return = "processed"

    def _prerequisites():
        return {"Host match": "not processed", "Cutout download": "processed"}

    @property
    def task_name():
        return "Host match"

    @property
    def task_frequency_seconds(self):
        return 60

    @property
    def task_initially_enabled(self):
        return True

    def _failed_status_message()
        return "failed"

System Task

The SystemTaskRunner is somewhat simpler to implement as there is no chaining of prerequisite tasks, and the results do not need to be displayed in the Blast web interface. New system task runners should be implemented in the app/host/system_tasks.py module. A full SystemTaskRunner would look like:

from .base_tasks import SystemTaskRunner

class ExampleTaskRunner(SystemTaskRunner):
    """An Example TaskRunner"""

    @property
    def task_frequency_seconds(self):
        return 60

    @property
    def task_initially_enabled(self):
        return True

    def run_process(transient):
        #Put your code here!
        return = "processed"

Registering your task

For Blast to actually run your task you have to register it within the app. For both a SystemTaskRunner and a TransientTaskRunner you have to add an instance of your Taskrunner to the periodic_tasks list in app/host/task.py.

To check that your task has been registered and is being run in Blast go to http://0.0.0.0:8000/admin/ login and then go to http://0.0.0.0:8000/admin/django_celery_beat/periodictask/ and you should see your task and its schedule.

You can check if your task is running without error by going to the Flower dashboard at http://0.0.0.0:8888.