How to Utilize the Airflow Context in Your DAGs
What is the Airflow context
The Airflow context is a dictionary which contains variables about the Airflow environment and the current running DAG and tasks. The context can be useful when you need to access task level or DAG level information in your DAGs.
Some examples of using the context include:
Wanting to access a task’s xcom value
Viewing the state of a task
Manually executing a task using the execute() function
Taking an action on a task in custom functionality eg. writing a custom short circuit to skip specific tasks when triggered
Common context variables are:
# Access the Task Instance object
context[“ti”]
context[“task_instance”]
# Read and write to the xcom
context[“ti”].xcom_pull
context[“ti”].xcom_push
# Get the DAG object
context[“dag”]
# Get information about a specific DAG run
context["dag_run"]
# Get state of the specific task by <task_id>
context["dag_run"].get_task_instance(task_id).state
A full list of variables in the context are documented as so: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
https://github.com/apache/airflow/blob/main/airflow/utils/context.pyi
How to use the Airflow context
To use the Airflow context you first need to import the context to your DAG:
from airflow.operators.python import get_current_context
The context can now be called in your DAG. This context is only accessible during task execution and it is not accessible during pre_execute or post_execute. If this function is called ouside of current execution then an error will be raised.
context = get_current_context()
Airflow context use cases
Current state of task
An example of using the context is to read the current state of the task.
from airflow.operators.python import get_current_context
def get_task_state(task_id):
context = get_current_context()
return context[“dag_run”].get_task_instance(task_id).state
Here we import the Airflow context. We then call the context function to retrieve the DAG context. We query the context dictionary to get the dag run task instance, and the state of that instance. For an example of a full DAG which retrieves the state of a task, see here: https://github.com/sophia-pol/airflow-dags/blob/main/dags/airflow-context.py
This function accesses the whole context which can cause some slight performance issues and it is recommended to retrieve context variables specifically as noted in this documentation. https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html
Retrieving xcom value
We can retrieve an xcom value from a task using the context.
from airflow.operators.python import get_current_context
def get_xcom_value(task_id):
context = get_current_context()
json_response = context[‘task_instance’].xcom_pull(task_ids=task_id, key=’your_key_value’)
return json_response
The json response may need some pre-processing to retrieve the specific value you are after in the xcom depending on your Airflow setup.
Parsing context to Python operators
Set the provide_context = True field in the PythonOperator to ensure the context gets parsed to the python callable.
task_state = PythonOperator(
task_id="get_state_task",
provide_context = True,
python_callable = get_task_state
op_kwargs={“task_id”: “previous_task_id”},
trigger_rule = “all_done”
dag = dag)
Subscribe to my newsletter
Read articles from Sophia Polito directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by