How to Write Custom Overall DAG Status in Apache Airflow
The DAG status is the overall status for a DAG which is determined after the DAG execution has completed. A completed DAG execution occurs when all tasks are in a terminal state of success, failed or skipped. A DAG status can either be success or failed. An overall DAG status of success occurs if all the leaf nodes are in a success or skipped state. A failed DAG status occurs when any leaf node is in a state of failed or upstream_failed.
This means the overall DAG status is based on the status of the final tasks. However this may not always be desirable and we may want to base our overall DAG status on the status of earlier tasks.
Lets take this Airflow DAG flow as an example:
The first task fails, which triggers a short circuit task which skips the rest of the Airflow downstream tasks. Since the status of the last task is skipped, the overall DAG status is Success. However, for logging purposes as may want to have an overall Failure DAG status here, to reflect the task failure of the first task.
We can write a custom final task for this to enforce an overall DAG status of Failed, when any upstream task fails. The specific DAG status code segments will be discussed in this article, and you can find an example of a complete DAG with a custom final status task here: https://github.com/sophia-pol/airflow-dags/blob/main/dags/overall-dag-status.py
We start by creating an overall_dag_status function to use in our final task.
def overall_dag_status(task_list):
context = get_current_context()
for task_id in task_list:
state = context[“dag_run”].get_task_instance(task_id).state
if state == “failed”:
raise Execption(“Task {} failed. Failing this DAG run”.format(task_id))
And we call the above function in a Python Operator task.
overall_status = PythonOperator(
task_id = ‘overall_status’,
provide_context=True,
python_callable = overall_dag_status,
op_kwargs={ “task_list”: []},
trigger_rule = “all_done”,
dag = dag
)
The overall_dag_status function is iterating through all tasks in the list and checks the task's failure or success status in the context. If any of the tasks have a failed status, then we raise an exception. We put this task at the end of our task flow for it to pick up all previous task statuses. When the exception is triggered, it will fail the leaf node task, and result in the overall DAG status being failed.
When running the full DAG example containing the overall dag status task (https://github.com/sophia-pol/airflow-dags/blob/main/dags/overall-dag-status.py), our task flow is as follows:
The first task fails, which triggers the short circuit. The short circuit skips the next task and the overall_dag_status task runs. dsads This task identifies the earlier failure of the first_task, and the exception is raised. The exception fails the final task, resulting in an overall DAG status of Failed.
To get the above flow working, you may notice the linked code contains a custom short circuit task. This is so we don't also skip the overall_dag_status task when the short circuit is triggered. For more information on the custom short circuit task see here: https://sophiapol.hashnode.dev/enhancing-airflow-dags-with-custom-short-circuit-operators
Subscribe to my newsletter
Read articles from Sophia Polito directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by