How to Utilize the Airflow Context in Your DAGs

Sophia PolitoSophia Polito
3 min read

What is the Airflow context

The Airflow context is a dictionary which contains variables about the Airflow environment and the current running DAG and tasks. The context can be useful when you need to access task level or DAG level information in your DAGs.

Some examples of using the context include:

  • Wanting to access a task’s xcom value

  • Viewing the state of a task

  • Manually executing a task using the execute() function

  • Taking an action on a task in custom functionality eg. writing a custom short circuit to skip specific tasks when triggered

Common context variables are:

# Access the Task Instance object
context[“ti”]
context[“task_instance”]

# Read and write to the xcom
context[“ti”].xcom_pull
context[“ti”].xcom_push

# Get the DAG object
context[“dag”]

# Get information about a specific DAG run
context["dag_run"]

# Get state of the specific task by <task_id>
context["dag_run"].get_task_instance(task_id).state

A full list of variables in the context are documented as so: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html

https://github.com/apache/airflow/blob/main/airflow/utils/context.pyi

How to use the Airflow context

To use the Airflow context you first need to import the context to your DAG:

from airflow.operators.python import get_current_context

The context can now be called in your DAG. This context is only accessible during task execution and it is not accessible during pre_execute or post_execute. If this function is called ouside of current execution then an error will be raised.

context = get_current_context()

Airflow context use cases

Current state of task

An example of using the context is to read the current state of the task.

from airflow.operators.python import get_current_context

def get_task_state(task_id):
    context = get_current_context()
    return context[“dag_run”].get_task_instance(task_id).state

Here we import the Airflow context. We then call the context function to retrieve the DAG context. We query the context dictionary to get the dag run task instance, and the state of that instance. For an example of a full DAG which retrieves the state of a task, see here: https://github.com/sophia-pol/airflow-dags/blob/main/dags/airflow-context.py

This function accesses the whole context which can cause some slight performance issues and it is recommended to retrieve context variables specifically as noted in this documentation. https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html

Retrieving xcom value

We can retrieve an xcom value from a task using the context.

from airflow.operators.python import get_current_context

def get_xcom_value(task_id):
    context = get_current_context()
    json_response = context[‘task_instance’].xcom_pull(task_ids=task_id, key=’your_key_value’)
    return json_response

The json response may need some pre-processing to retrieve the specific value you are after in the xcom depending on your Airflow setup.

Parsing context to Python operators

Set the provide_context = True field in the PythonOperator to ensure the context gets parsed to the python callable.

task_state = PythonOperator(
    task_id="get_state_task",
    provide_context = True,
    python_callable = get_task_state
    op_kwargs={“task_id”: “previous_task_id”},
    trigger_rule = “all_done”
    dag = dag)
0
Subscribe to my newsletter

Read articles from Sophia Polito directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Sophia Polito
Sophia Polito