20 Airflow concepts with Before-and-After Examples

Anix LynchAnix Lynch
11 min read

1. Installing Airflow via pip ๐Ÿ› ๏ธ

Boilerplate Code:

pip install apache-airflow

Use Case: Install Apache Airflow to automate workflows and tasks.

Goal: Set up Airflow for task automation on your local environment. ๐ŸŽฏ

Sample Code:

pip install apache-airflow

Before Example: You manually manage task workflows and dependencies using scripts and cron jobs.

python my_workflow.py
# Cron jobs are used for periodic task execution.

After Example: Once Airflow is installed, you can run workflows automatically and manage tasks through the Airflow interface.

Successfully installed apache-airflow
# Airflow is now installed and ready to automate task workflows.

2. Starting the Airflow Web Server ๐ŸŒ

Boilerplate Code:

airflow webserver -p 8080

Use Case: Start the Airflow web UI to manage and monitor workflows.

Goal: Access the Airflow web interface to view, schedule, and control tasks. ๐ŸŽฏ

Sample Code:

airflow webserver -p 8080

Before Example: You manually check logs and monitor scripts in your terminal without a visual interface.

tail -f workflow.log  # Manually checking task status through logs.

After Example: Once the web server starts, you can monitor and manage tasks through a browser.

Starting the Airflow webserver on port 8080...
# Navigate to http://localhost:8080 to view and manage tasks.

3. Initializing the Airflow Database ๐Ÿ—„๏ธ

Boilerplate Code:

airflow db init

Use Case: Set up the metadata database for Airflow.

Goal: Initialize the database to track DAGs (Directed Acyclic Graphs), tasks, and other metadata. ๐ŸŽฏ

Sample Code:

airflow db init

Before Example: You manually track workflow tasks and dependencies using external tools like spreadsheets.

# Spreadsheet with tasks and their statuses.
# Manually updated for tracking.

After Example: Once the Airflow database is initialized, DAGs and task metadata are automatically stored and managed.

Database initialized successfully.
# Workflow and task information is now automatically stored in Airflow's database.

4. Creating a New DAG (Directed Acyclic Graph) ๐Ÿ—‚๏ธ

Boilerplate Code:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

dag = DAG('example_dag', start_date=datetime(2023, 1, 1))
task = DummyOperator(task_id='dummy_task', dag=dag)

Use Case: Define a DAG to represent a set of tasks with dependencies.

Goal: Create a simple Airflow DAG to automate a sequence of tasks. ๐ŸŽฏ

Sample Code:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

dag = DAG('example_dag', start_date=datetime(2023, 1, 1))
task = DummyOperator(task_id='dummy_task', dag=dag)

Before Example: You manually run tasks in a specific order, keeping track of dependencies yourself.

python task1.py
python task2.py
# You need to ensure that tasks run in the right order manually.

After Example: With Airflow, the DAG defines task dependencies and execution order.

DAG created: example_dag
# Tasks are now automated within the DAG, running according to the defined order and dependencies.

5. Scheduling a Task with a Cron Expression ๐Ÿ•’

Boilerplate Code:

dag = DAG(
    'scheduled_dag',
    default_args={'owner': 'airflow', 'start_date': datetime(2023, 1, 1)},
    schedule_interval='0 12 * * *',  # Runs daily at noon
)

Use Case: Schedule tasks in Airflow using cron-like expressions.

Goal: Automatically run tasks on a specified schedule. ๐ŸŽฏ

Sample Code:

dag = DAG(
    'scheduled_dag',
    default_args={'owner': 'airflow', 'start_date': datetime(2023, 1, 1)},
    schedule_interval='0 12 * * *',  # Runs daily at noon
)

Before Example: You manually run tasks or schedule them via external cron jobs.

# Cron job scheduled in the system:
0 12 * * * /usr/bin/python3 /path/to/task.py

After Example: Airflow runs the task automatically every day at noon based on the schedule defined in the DAG.

Task scheduled successfully: scheduled_dag
# Task is now set to run daily at noon without manual intervention.

6. Defining Task Dependencies in a DAG ๐Ÿ”—

Boilerplate Code:

task1 >> task2  # task2 runs after task1

Use Case: Ensure tasks run in a specific order within a DAG.

Goal: Define the order of execution for tasks in a workflow. ๐ŸŽฏ

Sample Code:

task1 = DummyOperator(task_id='start', dag=dag)
task2 = DummyOperator(task_id='end', dag=dag)
task1 >> task2  # task2 runs after task1

Before Example: Tasks are executed in random order or require manual management of dependencies.

# Running scripts manually in a specific order:
python script1.py
python script2.py

After Example: Airflow ensures tasks are executed in the defined sequence automatically.

# Task dependency set: task1 will run before task2.
Task: start -> end
# Airflow handles the execution order automatically.

7. Using Task Retries in Airflow ๐Ÿ”„

Boilerplate Code:

default_args = {'retries': 3}

Use Case: Automatically retry failed tasks in Airflow.

Goal: Set up automatic retries for tasks that fail. ๐ŸŽฏ

Sample Code:

default_args = {
    'owner': 'airflow',
    'retries': 3,  # Retry failed tasks up to 3 times
}
dag = DAG('retry_dag', default_args=default_args, start_date=datetime(2023, 1, 1))

Before Example: You manually rerun scripts or tasks after they fail.

# If task fails, manually run it again:
python task.py

After Example: Airflow automatically retries the failed task up to the specified number of times.

Task failed, retrying... (Attempt 1 of 3)
Task completed successfully after retry.

8. Branching in Airflow (Conditional Tasks) ๐ŸŒฟ

Boilerplate Code:

branch_task = BranchPythonOperator(
    task_id='branching',
    python_callable=choose_branch,
    dag=dag
)

Use Case: Execute tasks conditionally based on the outcome of previous tasks.

Goal: Use branching to control the flow of task execution based on conditions. ๐ŸŽฏ

Sample Code:

def choose_branch():
    if condition:
        return 'task1'
    else:
        return 'task2'

branch_task = BranchPythonOperator(
    task_id='branching',
    python_callable=choose_branch,
    dag=dag
)
task1 = DummyOperator(task_id='task1', dag=dag)
task2 = DummyOperator(task_id='task2', dag=dag)
branch_task >> [task1, task2]

Before Example: You manually manage conditions and execute the appropriate tasks based on logic.

if condition:
    python task1.py
else:
    python task2.py

After Example: Airflow automatically chooses which tasks to execute based on the branching logic.

Condition met, branching to task1.
# Airflow handles the branching and executes the correct tasks.

9. Setting Task Timeouts in Airflow โณ

Boilerplate Code:

default_args = {'execution_timeout': timedelta(minutes=5)}

Use Case: Limit the execution time of tasks to prevent long-running operations.

Goal: Set timeouts for tasks to avoid hanging operations. ๐ŸŽฏ

Sample Code:

default_args = {
    'owner': 'airflow',
    'execution_timeout': timedelta(minutes=5),  # Task must complete within 5 minutes
}
dag = DAG('timeout_dag', default_args=default_args, start_date=datetime(2023, 1, 1))

Before Example: You manually monitor tasks and stop them if they run too long.

# Manually check and kill long-running tasks:
kill -9 <task_pid>

After Example: Airflow automatically stops tasks that exceed the specified timeout.

Task exceeded timeout of 5 minutes, terminating.
# Task is automatically stopped after 5 minutes if not completed.

10. Triggering DAGs Manually ๐Ÿ“…

Boilerplate Code:

airflow dags trigger <dag_id>

Use Case: Trigger a DAG to run outside of its regular schedule.

Goal: Manually trigger a DAG when needed. ๐ŸŽฏ

Sample Code:

airflow dags trigger my_dag

Before Example: You run the script manually if you need to rerun a workflow outside of its schedule.

python my_workflow.py

After Example: Airflow allows you to trigger the DAG manually from the command line.

DAG triggered: my_dag
# DAG execution started manually outside of the regular schedule.

11. Pausing and Unpausing a DAG ๐Ÿ›‘โ–ถ๏ธ

Boilerplate Code:

airflow dags pause <dag_id>
airflow dags unpause <dag_id>

Use Case: Temporarily stop a DAG from executing or resume its execution.

Goal: Pause or unpause a DAG without deleting it. ๐ŸŽฏ

Sample Code:

airflow dags pause my_dag
airflow dags unpause my_dag

Before Example: You manually stop tasks by interrupting them or shutting down servers.

# Manually stop tasks:
kill -9 <pid>

After Example: Airflow allows you to pause and unpause DAGs without disrupting the system.

DAG paused: my_dag
DAG unpaused: my_dag
# DAG execution paused and resumed without deletion.

12. Backfilling DAG Runs ๐Ÿ“†

Boilerplate Code:

airflow dags backfill <dag_id> -s <start_date> -e <end_date>

Use Case: Re-run missed DAG executions over a specific time range.

Goal: Backfill DAG runs that were missed or failed. ๐ŸŽฏ

Sample Code:

airflow dags backfill my_dag -s 2023-01-01 -e 2023-01-07

Before Example: You manually rerun missed tasks for specific dates by rescheduling or manually executing them.

# Manually run missed tasks:
python my_task.py --date 2023-01-01

After Example: Airflow automatically backfills missed runs for the specified dates.

Backfilling DAG: my_dag from 2023-01-01 to 2023-01-07
# DAG runs for the specified dates are backfilled successfully.

13. Triggering a Task from Another Task ๐Ÿ”—

Boilerplate Code:

trigger_task = TriggerDagRunOperator(
    task_id='trigger_task',
    trigger_dag_id='other_dag',
    dag=dag
)

Use Case: Trigger a different DAG as part of the execution of the current DAG.

Goal: Chain workflows by triggering one DAG from another. ๐ŸŽฏ

Sample Code:

trigger_task = TriggerDagRunOperator(
    task_id='trigger_task',
    trigger_dag_id='other_dag',
    dag=dag
)

Before Example: You manually execute multiple workflows one after another without automated dependency management.

python workflow1.py
python workflow2.py
# Manually chaining workflows.

After Example: Airflow automatically triggers the second DAG as part of the first DAGโ€™s workflow.

Task triggered DAG: other_dag
# The second DAG is triggered automatically after the first one completes.

14. Email Alerts for Task Failures ๐Ÿ“ง

Boilerplate Code:

default_args = {
    'email': ['your_email@example.com'],
    'email_on_failure': True
}

Use Case: Send an email notification if a task fails.

Goal: Automatically receive an email when a task fails. ๐ŸŽฏ

Sample Code:

default_args = {
    'owner': 'airflow',
    'email': ['your_email@example.com'],
    'email_on_failure': True,
    'start_date': datetime(2023, 1, 1),
}
dag = DAG('alert_dag', default_args=default_args)

Before Example: You manually monitor the logs to check if tasks failed, without any automatic alert system.

# Check logs manually to see if a task failed.
tail -f /var/logs/task.log

After Example: Airflow automatically sends an email notification if a task fails.

Task failed, sending email to: your_email@example.com
# You receive an email alert when a task fails.

15. Setting Task Priority Weight ๐Ÿ‹๏ธ

Boilerplate Code:

task = DummyOperator(
    task_id='high_priority_task',
    priority_weight=10,
    dag=dag
)

Use Case: Assign priority to specific tasks to determine execution order.

Goal: Prioritize certain tasks over others in a DAG. ๐ŸŽฏ

Sample Code:

task1 = DummyOperator(task_id='low_priority_task', priority_weight=5, dag=dag)
task2 = DummyOperator(task_id='high_priority_task', priority_weight=10, dag=dag)

Before Example: Tasks are executed in the order they are added, without any notion of priority.

# Manually manage task execution order.
python task1.py
python task2.py

After Example: Airflow automatically prioritizes tasks with higher weights, executing them first.

Executing high_priority_task (weight: 10) before low_priority_task (weight: 5)
# Tasks are executed in order of their priority weights.

16. Clearing Task Instances in a DAG ๐Ÿงน

Boilerplate Code:

airflow tasks clear <dag_id> --start-date <date> --end-date <date>

Use Case: Clear the status of tasks that have already been executed in a DAG.

Goal: Reset tasks so that they can be re-executed. ๐ŸŽฏ

Sample Code:

airflow tasks clear my_dag --start-date 2023-01-01 --end-date 2023-01-07

Before Example: You manually delete logs or reset status flags in the database for re-executing tasks.

# Manually clear task states by modifying logs:
rm -rf /var/logs/my_task/

After Example: Airflow clears the status of the selected task instances, making them ready for re-execution.

Clearing tasks from 2023-01-01 to 2023-01-07 for DAG: my_dag
# Tasks are cleared and ready for rerun.

17. Running Airflow in Sequential Executor Mode ๐Ÿ› ๏ธ

Boilerplate Code:

airflow scheduler

Use Case: Run tasks one at a time in a lightweight environment.

Goal: Use Airflow's SequentialExecutor to run tasks in sequence with limited resources. ๐ŸŽฏ

Sample Code:

airflow scheduler

Before Example: You manage multiple tasks on different systems without centralized scheduling.

# Manually execute tasks one at a time:
python task1.py
python task2.py

After Example: Airflow schedules tasks in sequence, managing them in the correct order automatically.

Scheduler running with SequentialExecutor.
# Tasks are executed one after the other in a lightweight mode.

18. Using ExternalTaskSensor to Wait for External DAGs โณ

Boilerplate Code:

ExternalTaskSensor(
    task_id='wait_for_external_dag',
    external_dag_id='external_dag',
    external_task_id='external_task',
    timeout=600,
    dag=dag
)

Use Case: Wait for a task in an external DAG to complete before running tasks in the current DAG.

Goal: Synchronize tasks across different DAGs. ๐ŸŽฏ

Sample Code:

wait_task = ExternalTaskSensor(
    task_id='wait_for_external_task',
    external_dag_id='external_dag',
    external_task_id='external_task',
    timeout=600,
    dag=dag
)

Before Example: You manually monitor the status of external tasks and trigger dependent workflows yourself.

# Check manually if external tasks are complete:
python check_status.py

After Example: Airflow automatically waits for the external task to complete before executing the dependent task.

Waiting for external task to complete: external_task
# Dependent task runs only after the external task completes.

19. SubDAGs (Nested DAGs) ๐Ÿงฉ

Boilerplate Code:

subdag_task = SubDagOperator(
    task_id='my_subdag',
    subdag=create_subdag(),
    dag=dag
)

Use Case: Break down complex workflows into smaller, reusable components (SubDAGs).

Goal: Create nested workflows that can be reused across different DAGs. ๐ŸŽฏ

Sample Code:

subdag_task = SubDagOperator(
    task_id='my_subdag',
    subdag=create_subdag('main_dag', 'my_subdag', default_args),
    dag=dag
)

Before Example: You create large, complex workflows where all tasks are defined in a single file, making them harder to manage.

# All tasks in one big script:
python workflow.py

After Example: With SubDAGs, you can break the workflow into smaller pieces that can be managed and reused.

SubDAG created: my_subdag
# The complex workflow is split into more manageable, smaller workflows.

20. XComs for Task Communication ๐Ÿ“จ

Boilerplate Code:

task.xcom_push(key='message', value='Hello from task 1')
task.xcom_pull(task_ids='task1', key='message')

Use Case: Share data between tasks using XComs.

Goal: Pass information from one task to another in a DAG. ๐ŸŽฏ

Sample Code:

# In task 1
task1.xcom_push(key='message', value='Hello from task 1')

# In task 2
message = task2.xcom_pull(task_ids='task1', key='message')

Before Example: You manually pass data between tasks by writing it to files or databases.

# Save output to a file:
echo "Hello from task 1" > message.txt
# Read output in another script:
cat message.txt

After Example: With Airflow XComs, you can directly pass data between tasks without needing external storage.

XCom pushed: 'Hello from task 1'
XCom pulled: 'Hello from task 1'
# Data shared between tasks using Airflow XComs.
0
Subscribe to my newsletter

Read articles from Anix Lynch directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Anix Lynch
Anix Lynch