Enhancing Airflow DAGs with Custom Short Circuit Operators

Sophia PolitoSophia Polito
2 min read

Airflow short circuits control the execution flow of tasks in your DAG. A short circuit will trigger based on some conditions and then skip all downstream tasks. For example if you have a task failure you may want the short circuit to skip all remaining tasks, if the downstream tasks rely on earlier task completion. Custom short circuits can come in handy when you only want to skip specific tasks, rather than your entire DAG flow.

We can create a custom short circuit function as follows:

from airflow.operators.python import get_current_context
from airflow.models.skipmixin import SkipMixin

def get_task_state(task_id):
    context = get_current_context()
    task_state = context["dag_run"].get_task_instance(task_id).state
    return task_state

def custom_short_circuit(upstream_task_id, task_ids_to_skip):
    tasks_to_skip = []
    task_state = get_task_state(upstream_task_id)
    if task_state == "failed":
        context = get_current_context()
        for task_id in task_ids_to_skip:
            task = context["dag_run"].get_task_instance(task_id)
            tasks_to_skip.append(task) 
        if tasks_to_skip:
            SkipMixin().skip(dag_run=context['dag_run'], execution_date=context['ti'].execution_date, tasks=tasks_to_skip)

The custom_short_circuit function takes the upstream task as input to determine if the previous task was successful or failed. If this task failed, we want to "trigger" the short circuit and skip downstream tasks. However we do not want to skip all tasks, and the task_ids_to_skip specifies which tasks we want to skip. We read from the DAG context to retrieve the task instances specified by the task ids and then skip them using the SkipMixin function.

We call the custom_short_circuit function using a Python Operator.

short_circuit = PythonOperator(
    task_id="short_circuit",
    provide_context=True,
    python_callable=custom_short_circuit,
    op_kwargs={"upstream_task_id": "first_task", "task_ids_to_skip": ["second_task", "third_task"]},
    trigger_rule="all_done",
    dag=dag
)

The full DAG example of the above code can be found here: https://github.com/sophia-pol/airflow-dags/blob/main/dags/custom-short-circuit.py

When running the DAG linked above we can see the following flow:

The first task fails which triggers the short circuit task. We have specified that we want the second_task and third_task to be skipped, and the final task remains un-skipped and is executed successfully.

0
Subscribe to my newsletter

Read articles from Sophia Polito directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Sophia Polito
Sophia Polito