20 Airflow concepts with Before-and-After Examples

Anix LynchAnix Lynch
11 min read

1. Installing Airflow via pip ๐Ÿ› ๏ธ

Boilerplate Code:

pip install apache-airflow

Use Case: Install Apache Airflow to automate workflows and tasks.

Goal: Set up Airflow for task automation on your local environment. ๐ŸŽฏ

Sample Code:

pip install apache-airflow

Before Example: You manually manage task workflows and dependencies using scripts and cron jobs.

python my_workflow.py
# Cron jobs are used for periodic task execution.

After Example: Once Airflow is installed, you can run workflows automatically and manage tasks through the Airflow interface.

Successfully installed apache-airflow
# Airflow is now installed and ready to automate task workflows.

2. Starting the Airflow Web Server ๐ŸŒ

Boilerplate Code:

airflow webserver -p 8080

Use Case: Start the Airflow web UI to manage and monitor workflows.

Goal: Access the Airflow web interface to view, schedule, and control tasks. ๐ŸŽฏ

Sample Code:

airflow webserver -p 8080

Before Example: You manually check logs and monitor scripts in your terminal without a visual interface.

tail -f workflow.log  # Manually checking task status through logs.

After Example: Once the web server starts, you can monitor and manage tasks through a browser.

Starting the Airflow webserver on port 8080...
# Navigate to http://localhost:8080 to view and manage tasks.

3. Initializing the Airflow Database ๐Ÿ—„๏ธ

Boilerplate Code:

airflow db init

Use Case: Set up the metadata database for Airflow.

Goal: Initialize the database to track DAGs (Directed Acyclic Graphs), tasks, and other metadata. ๐ŸŽฏ

Sample Code:

airflow db init

Before Example: You manually track workflow tasks and dependencies using external tools like spreadsheets.

# Spreadsheet with tasks and their statuses.
# Manually updated for tracking.

After Example: Once the Airflow database is initialized, DAGs and task metadata are automatically stored and managed.

Database initialized successfully.
# Workflow and task information is now automatically stored in Airflow's database.

4. Creating a New DAG (Directed Acyclic Graph) ๐Ÿ—‚๏ธ

Boilerplate Code:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

dag = DAG('example_dag', start_date=datetime(2023, 1, 1))
task = DummyOperator(task_id='dummy_task', dag=dag)

Use Case: Define a DAG to represent a set of tasks with dependencies.

Goal: Create a simple Airflow DAG to automate a sequence of tasks. ๐ŸŽฏ

Sample Code:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

dag = DAG('example_dag', start_date=datetime(2023, 1, 1))
task = DummyOperator(task_id='dummy_task', dag=dag)

Before Example: You manually run tasks in a specific order, keeping track of dependencies yourself.

python task1.py
python task2.py
# You need to ensure that tasks run in the right order manually.

After Example: With Airflow, the DAG defines task dependencies and execution order.

DAG created: example_dag
# Tasks are now automated within the DAG, running according to the defined order and dependencies.

5. Scheduling a Task with a Cron Expression ๐Ÿ•’

Boilerplate Code:

dag = DAG(
    default_args={'owner': 'airflow', 'start_date': datetime(2023, 1, 1)},
    schedule_interval='0 12 * * *',  # Runs daily at noon

Use Case: Schedule tasks in Airflow using cron-like expressions.

Goal: Automatically run tasks on a specified schedule. ๐ŸŽฏ

Sample Code:

dag = DAG(
    default_args={'owner': 'airflow', 'start_date': datetime(2023, 1, 1)},
    schedule_interval='0 12 * * *',  # Runs daily at noon

Before Example: You manually run tasks or schedule them via external cron jobs.

# Cron job scheduled in the system:
0 12 * * * /usr/bin/python3 /path/to/task.py

After Example: Airflow runs the task automatically every day at noon based on the schedule defined in the DAG.

Task scheduled successfully: scheduled_dag
# Task is now set to run daily at noon without manual intervention.

6. Defining Task Dependencies in a DAG ๐Ÿ”—

Boilerplate Code:

task1 >> task2  # task2 runs after task1

Use Case: Ensure tasks run in a specific order within a DAG.

Goal: Define the order of execution for tasks in a workflow. ๐ŸŽฏ

Sample Code:

task1 = DummyOperator(task_id='start', dag=dag)
task2 = DummyOperator(task_id='end', dag=dag)
task1 >> task2  # task2 runs after task1

Before Example: Tasks are executed in random order or require manual management of dependencies.

# Running scripts manually in a specific order:
python script1.py
python script2.py

After Example: Airflow ensures tasks are executed in the defined sequence automatically.

# Task dependency set: task1 will run before task2.
Task: start -> end
# Airflow handles the execution order automatically.

7. Using Task Retries in Airflow ๐Ÿ”„

Boilerplate Code:

default_args = {'retries': 3}

Use Case: Automatically retry failed tasks in Airflow.

Goal: Set up automatic retries for tasks that fail. ๐ŸŽฏ

Sample Code:

default_args = {
    'owner': 'airflow',
    'retries': 3,  # Retry failed tasks up to 3 times
dag = DAG('retry_dag', default_args=default_args, start_date=datetime(2023, 1, 1))

Before Example: You manually rerun scripts or tasks after they fail.

# If task fails, manually run it again:
python task.py

After Example: Airflow automatically retries the failed task up to the specified number of times.

Task failed, retrying... (Attempt 1 of 3)
Task completed successfully after retry.

8. Branching in Airflow (Conditional Tasks) ๐ŸŒฟ

Boilerplate Code:

branch_task = BranchPythonOperator(

Use Case: Execute tasks conditionally based on the outcome of previous tasks.

Goal: Use branching to control the flow of task execution based on conditions. ๐ŸŽฏ

Sample Code:

def choose_branch():
    if condition:
        return 'task1'
        return 'task2'

branch_task = BranchPythonOperator(
task1 = DummyOperator(task_id='task1', dag=dag)
task2 = DummyOperator(task_id='task2', dag=dag)
branch_task >> [task1, task2]

Before Example: You manually manage conditions and execute the appropriate tasks based on logic.

if condition:
    python task1.py
    python task2.py

After Example: Airflow automatically chooses which tasks to execute based on the branching logic.

Condition met, branching to task1.
# Airflow handles the branching and executes the correct tasks.

9. Setting Task Timeouts in Airflow โณ

Boilerplate Code:

default_args = {'execution_timeout': timedelta(minutes=5)}

Use Case: Limit the execution time of tasks to prevent long-running operations.

Goal: Set timeouts for tasks to avoid hanging operations. ๐ŸŽฏ

Sample Code:

default_args = {
    'owner': 'airflow',
    'execution_timeout': timedelta(minutes=5),  # Task must complete within 5 minutes
dag = DAG('timeout_dag', default_args=default_args, start_date=datetime(2023, 1, 1))

Before Example: You manually monitor tasks and stop them if they run too long.

# Manually check and kill long-running tasks:
kill -9 <task_pid>

After Example: Airflow automatically stops tasks that exceed the specified timeout.

Task exceeded timeout of 5 minutes, terminating.
# Task is automatically stopped after 5 minutes if not completed.

10. Triggering DAGs Manually ๐Ÿ“…

Boilerplate Code:

airflow dags trigger <dag_id>

Use Case: Trigger a DAG to run outside of its regular schedule.

Goal: Manually trigger a DAG when needed. ๐ŸŽฏ

Sample Code:

airflow dags trigger my_dag

Before Example: You run the script manually if you need to rerun a workflow outside of its schedule.

python my_workflow.py

After Example: Airflow allows you to trigger the DAG manually from the command line.

DAG triggered: my_dag
# DAG execution started manually outside of the regular schedule.

11. Pausing and Unpausing a DAG ๐Ÿ›‘โ–ถ๏ธ

Boilerplate Code:

airflow dags pause <dag_id>
airflow dags unpause <dag_id>

Use Case: Temporarily stop a DAG from executing or resume its execution.

Goal: Pause or unpause a DAG without deleting it. ๐ŸŽฏ

Sample Code:

airflow dags pause my_dag
airflow dags unpause my_dag

Before Example: You manually stop tasks by interrupting them or shutting down servers.

# Manually stop tasks:
kill -9 <pid>

After Example: Airflow allows you to pause and unpause DAGs without disrupting the system.

DAG paused: my_dag
DAG unpaused: my_dag
# DAG execution paused and resumed without deletion.

12. Backfilling DAG Runs ๐Ÿ“†

Boilerplate Code:

airflow dags backfill <dag_id> -s <start_date> -e <end_date>

Use Case: Re-run missed DAG executions over a specific time range.

Goal: Backfill DAG runs that were missed or failed. ๐ŸŽฏ

Sample Code:

airflow dags backfill my_dag -s 2023-01-01 -e 2023-01-07

Before Example: You manually rerun missed tasks for specific dates by rescheduling or manually executing them.

# Manually run missed tasks:
python my_task.py --date 2023-01-01

After Example: Airflow automatically backfills missed runs for the specified dates.

Backfilling DAG: my_dag from 2023-01-01 to 2023-01-07
# DAG runs for the specified dates are backfilled successfully.

13. Triggering a Task from Another Task ๐Ÿ”—

Boilerplate Code:

trigger_task = TriggerDagRunOperator(

Use Case: Trigger a different DAG as part of the execution of the current DAG.

Goal: Chain workflows by triggering one DAG from another. ๐ŸŽฏ

Sample Code:

trigger_task = TriggerDagRunOperator(

Before Example: You manually execute multiple workflows one after another without automated dependency management.

python workflow1.py
python workflow2.py
# Manually chaining workflows.

After Example: Airflow automatically triggers the second DAG as part of the first DAGโ€™s workflow.

Task triggered DAG: other_dag
# The second DAG is triggered automatically after the first one completes.

14. Email Alerts for Task Failures ๐Ÿ“ง

Boilerplate Code:

default_args = {
    'email': ['your_email@example.com'],
    'email_on_failure': True

Use Case: Send an email notification if a task fails.

Goal: Automatically receive an email when a task fails. ๐ŸŽฏ

Sample Code:

default_args = {
    'owner': 'airflow',
    'email': ['your_email@example.com'],
    'email_on_failure': True,
    'start_date': datetime(2023, 1, 1),
dag = DAG('alert_dag', default_args=default_args)

Before Example: You manually monitor the logs to check if tasks failed, without any automatic alert system.

# Check logs manually to see if a task failed.
tail -f /var/logs/task.log

After Example: Airflow automatically sends an email notification if a task fails.

Task failed, sending email to: your_email@example.com
# You receive an email alert when a task fails.

15. Setting Task Priority Weight ๐Ÿ‹๏ธ

Boilerplate Code:

task = DummyOperator(

Use Case: Assign priority to specific tasks to determine execution order.

Goal: Prioritize certain tasks over others in a DAG. ๐ŸŽฏ

Sample Code:

task1 = DummyOperator(task_id='low_priority_task', priority_weight=5, dag=dag)
task2 = DummyOperator(task_id='high_priority_task', priority_weight=10, dag=dag)

Before Example: Tasks are executed in the order they are added, without any notion of priority.

# Manually manage task execution order.
python task1.py
python task2.py

After Example: Airflow automatically prioritizes tasks with higher weights, executing them first.

Executing high_priority_task (weight: 10) before low_priority_task (weight: 5)
# Tasks are executed in order of their priority weights.

16. Clearing Task Instances in a DAG ๐Ÿงน

Boilerplate Code:

airflow tasks clear <dag_id> --start-date <date> --end-date <date>

Use Case: Clear the status of tasks that have already been executed in a DAG.

Goal: Reset tasks so that they can be re-executed. ๐ŸŽฏ

Sample Code:

airflow tasks clear my_dag --start-date 2023-01-01 --end-date 2023-01-07

Before Example: You manually delete logs or reset status flags in the database for re-executing tasks.

# Manually clear task states by modifying logs:
rm -rf /var/logs/my_task/

After Example: Airflow clears the status of the selected task instances, making them ready for re-execution.

Clearing tasks from 2023-01-01 to 2023-01-07 for DAG: my_dag
# Tasks are cleared and ready for rerun.

17. Running Airflow in Sequential Executor Mode ๐Ÿ› ๏ธ

Boilerplate Code:

airflow scheduler

Use Case: Run tasks one at a time in a lightweight environment.

Goal: Use Airflow's SequentialExecutor to run tasks in sequence with limited resources. ๐ŸŽฏ

Sample Code:

airflow scheduler

Before Example: You manage multiple tasks on different systems without centralized scheduling.

# Manually execute tasks one at a time:
python task1.py
python task2.py

After Example: Airflow schedules tasks in sequence, managing them in the correct order automatically.

Scheduler running with SequentialExecutor.
# Tasks are executed one after the other in a lightweight mode.

18. Using ExternalTaskSensor to Wait for External DAGs โณ

Boilerplate Code:


Use Case: Wait for a task in an external DAG to complete before running tasks in the current DAG.

Goal: Synchronize tasks across different DAGs. ๐ŸŽฏ

Sample Code:

wait_task = ExternalTaskSensor(

Before Example: You manually monitor the status of external tasks and trigger dependent workflows yourself.

# Check manually if external tasks are complete:
python check_status.py

After Example: Airflow automatically waits for the external task to complete before executing the dependent task.

Waiting for external task to complete: external_task
# Dependent task runs only after the external task completes.

19. SubDAGs (Nested DAGs) ๐Ÿงฉ

Boilerplate Code:

subdag_task = SubDagOperator(

Use Case: Break down complex workflows into smaller, reusable components (SubDAGs).

Goal: Create nested workflows that can be reused across different DAGs. ๐ŸŽฏ

Sample Code:

subdag_task = SubDagOperator(
    subdag=create_subdag('main_dag', 'my_subdag', default_args),

Before Example: You create large, complex workflows where all tasks are defined in a single file, making them harder to manage.

# All tasks in one big script:
python workflow.py

After Example: With SubDAGs, you can break the workflow into smaller pieces that can be managed and reused.

SubDAG created: my_subdag
# The complex workflow is split into more manageable, smaller workflows.

20. XComs for Task Communication ๐Ÿ“จ

Boilerplate Code:

task.xcom_push(key='message', value='Hello from task 1')
task.xcom_pull(task_ids='task1', key='message')

Use Case: Share data between tasks using XComs.

Goal: Pass information from one task to another in a DAG. ๐ŸŽฏ

Sample Code:

# In task 1
task1.xcom_push(key='message', value='Hello from task 1')

# In task 2
message = task2.xcom_pull(task_ids='task1', key='message')

Before Example: You manually pass data between tasks by writing it to files or databases.

# Save output to a file:
echo "Hello from task 1" > message.txt
# Read output in another script:
cat message.txt

After Example: With Airflow XComs, you can directly pass data between tasks without needing external storage.

XCom pushed: 'Hello from task 1'
XCom pulled: 'Hello from task 1'
# Data shared between tasks using Airflow XComs.
Subscribe to my newsletter

Read articles from Anix Lynch directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Anix Lynch
Anix Lynch