20 Airflow concepts with Before-and-After Examples
Table of contents
- 1. Installing Airflow via pip ๐ ๏ธ
- 2. Starting the Airflow Web Server ๐
- 3. Initializing the Airflow Database ๐๏ธ
- 4. Creating a New DAG (Directed Acyclic Graph) ๐๏ธ
- 5. Scheduling a Task with a Cron Expression ๐
- 6. Defining Task Dependencies in a DAG ๐
- 7. Using Task Retries in Airflow ๐
- 8. Branching in Airflow (Conditional Tasks) ๐ฟ
- 9. Setting Task Timeouts in Airflow โณ
- 10. Triggering DAGs Manually ๐
- 11. Pausing and Unpausing a DAG ๐โถ๏ธ
- 12. Backfilling DAG Runs ๐
- 13. Triggering a Task from Another Task ๐
- 14. Email Alerts for Task Failures ๐ง
- 15. Setting Task Priority Weight ๐๏ธ
- 16. Clearing Task Instances in a DAG ๐งน
- 17. Running Airflow in Sequential Executor Mode ๐ ๏ธ
- 18. Using ExternalTaskSensor to Wait for External DAGs โณ
- 19. SubDAGs (Nested DAGs) ๐งฉ
- 20. XComs for Task Communication ๐จ
1. Installing Airflow via pip ๐ ๏ธ
Boilerplate Code:
pip install apache-airflow
Use Case: Install Apache Airflow to automate workflows and tasks.
Goal: Set up Airflow for task automation on your local environment. ๐ฏ
Sample Code:
pip install apache-airflow
Before Example: You manually manage task workflows and dependencies using scripts and cron jobs.
python my_workflow.py
# Cron jobs are used for periodic task execution.
After Example: Once Airflow is installed, you can run workflows automatically and manage tasks through the Airflow interface.
Successfully installed apache-airflow
# Airflow is now installed and ready to automate task workflows.
2. Starting the Airflow Web Server ๐
Boilerplate Code:
airflow webserver -p 8080
Use Case: Start the Airflow web UI to manage and monitor workflows.
Goal: Access the Airflow web interface to view, schedule, and control tasks. ๐ฏ
Sample Code:
airflow webserver -p 8080
Before Example: You manually check logs and monitor scripts in your terminal without a visual interface.
tail -f workflow.log # Manually checking task status through logs.
After Example: Once the web server starts, you can monitor and manage tasks through a browser.
Starting the Airflow webserver on port 8080...
# Navigate to http://localhost:8080 to view and manage tasks.
3. Initializing the Airflow Database ๐๏ธ
Boilerplate Code:
airflow db init
Use Case: Set up the metadata database for Airflow.
Goal: Initialize the database to track DAGs (Directed Acyclic Graphs), tasks, and other metadata. ๐ฏ
Sample Code:
airflow db init
Before Example: You manually track workflow tasks and dependencies using external tools like spreadsheets.
# Spreadsheet with tasks and their statuses.
# Manually updated for tracking.
After Example: Once the Airflow database is initialized, DAGs and task metadata are automatically stored and managed.
Database initialized successfully.
# Workflow and task information is now automatically stored in Airflow's database.
4. Creating a New DAG (Directed Acyclic Graph) ๐๏ธ
Boilerplate Code:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
dag = DAG('example_dag', start_date=datetime(2023, 1, 1))
task = DummyOperator(task_id='dummy_task', dag=dag)
Use Case: Define a DAG to represent a set of tasks with dependencies.
Goal: Create a simple Airflow DAG to automate a sequence of tasks. ๐ฏ
Sample Code:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
dag = DAG('example_dag', start_date=datetime(2023, 1, 1))
task = DummyOperator(task_id='dummy_task', dag=dag)
Before Example: You manually run tasks in a specific order, keeping track of dependencies yourself.
python task1.py
python task2.py
# You need to ensure that tasks run in the right order manually.
After Example: With Airflow, the DAG defines task dependencies and execution order.
DAG created: example_dag
# Tasks are now automated within the DAG, running according to the defined order and dependencies.
5. Scheduling a Task with a Cron Expression ๐
Boilerplate Code:
dag = DAG(
'scheduled_dag',
default_args={'owner': 'airflow', 'start_date': datetime(2023, 1, 1)},
schedule_interval='0 12 * * *', # Runs daily at noon
)
Use Case: Schedule tasks in Airflow using cron-like expressions.
Goal: Automatically run tasks on a specified schedule. ๐ฏ
Sample Code:
dag = DAG(
'scheduled_dag',
default_args={'owner': 'airflow', 'start_date': datetime(2023, 1, 1)},
schedule_interval='0 12 * * *', # Runs daily at noon
)
Before Example: You manually run tasks or schedule them via external cron jobs.
# Cron job scheduled in the system:
0 12 * * * /usr/bin/python3 /path/to/task.py
After Example: Airflow runs the task automatically every day at noon based on the schedule defined in the DAG.
Task scheduled successfully: scheduled_dag
# Task is now set to run daily at noon without manual intervention.
6. Defining Task Dependencies in a DAG ๐
Boilerplate Code:
task1 >> task2 # task2 runs after task1
Use Case: Ensure tasks run in a specific order within a DAG.
Goal: Define the order of execution for tasks in a workflow. ๐ฏ
Sample Code:
task1 = DummyOperator(task_id='start', dag=dag)
task2 = DummyOperator(task_id='end', dag=dag)
task1 >> task2 # task2 runs after task1
Before Example: Tasks are executed in random order or require manual management of dependencies.
# Running scripts manually in a specific order:
python script1.py
python script2.py
After Example: Airflow ensures tasks are executed in the defined sequence automatically.
# Task dependency set: task1 will run before task2.
Task: start -> end
# Airflow handles the execution order automatically.
7. Using Task Retries in Airflow ๐
Boilerplate Code:
default_args = {'retries': 3}
Use Case: Automatically retry failed tasks in Airflow.
Goal: Set up automatic retries for tasks that fail. ๐ฏ
Sample Code:
default_args = {
'owner': 'airflow',
'retries': 3, # Retry failed tasks up to 3 times
}
dag = DAG('retry_dag', default_args=default_args, start_date=datetime(2023, 1, 1))
Before Example: You manually rerun scripts or tasks after they fail.
# If task fails, manually run it again:
python task.py
After Example: Airflow automatically retries the failed task up to the specified number of times.
Task failed, retrying... (Attempt 1 of 3)
Task completed successfully after retry.
8. Branching in Airflow (Conditional Tasks) ๐ฟ
Boilerplate Code:
branch_task = BranchPythonOperator(
task_id='branching',
python_callable=choose_branch,
dag=dag
)
Use Case: Execute tasks conditionally based on the outcome of previous tasks.
Goal: Use branching to control the flow of task execution based on conditions. ๐ฏ
Sample Code:
def choose_branch():
if condition:
return 'task1'
else:
return 'task2'
branch_task = BranchPythonOperator(
task_id='branching',
python_callable=choose_branch,
dag=dag
)
task1 = DummyOperator(task_id='task1', dag=dag)
task2 = DummyOperator(task_id='task2', dag=dag)
branch_task >> [task1, task2]
Before Example: You manually manage conditions and execute the appropriate tasks based on logic.
if condition:
python task1.py
else:
python task2.py
After Example: Airflow automatically chooses which tasks to execute based on the branching logic.
Condition met, branching to task1.
# Airflow handles the branching and executes the correct tasks.
9. Setting Task Timeouts in Airflow โณ
Boilerplate Code:
default_args = {'execution_timeout': timedelta(minutes=5)}
Use Case: Limit the execution time of tasks to prevent long-running operations.
Goal: Set timeouts for tasks to avoid hanging operations. ๐ฏ
Sample Code:
default_args = {
'owner': 'airflow',
'execution_timeout': timedelta(minutes=5), # Task must complete within 5 minutes
}
dag = DAG('timeout_dag', default_args=default_args, start_date=datetime(2023, 1, 1))
Before Example: You manually monitor tasks and stop them if they run too long.
# Manually check and kill long-running tasks:
kill -9 <task_pid>
After Example: Airflow automatically stops tasks that exceed the specified timeout.
Task exceeded timeout of 5 minutes, terminating.
# Task is automatically stopped after 5 minutes if not completed.
10. Triggering DAGs Manually ๐
Boilerplate Code:
airflow dags trigger <dag_id>
Use Case: Trigger a DAG to run outside of its regular schedule.
Goal: Manually trigger a DAG when needed. ๐ฏ
Sample Code:
airflow dags trigger my_dag
Before Example: You run the script manually if you need to rerun a workflow outside of its schedule.
python my_workflow.py
After Example: Airflow allows you to trigger the DAG manually from the command line.
DAG triggered: my_dag
# DAG execution started manually outside of the regular schedule.
11. Pausing and Unpausing a DAG ๐โถ๏ธ
Boilerplate Code:
airflow dags pause <dag_id>
airflow dags unpause <dag_id>
Use Case: Temporarily stop a DAG from executing or resume its execution.
Goal: Pause or unpause a DAG without deleting it. ๐ฏ
Sample Code:
airflow dags pause my_dag
airflow dags unpause my_dag
Before Example: You manually stop tasks by interrupting them or shutting down servers.
# Manually stop tasks:
kill -9 <pid>
After Example: Airflow allows you to pause and unpause DAGs without disrupting the system.
DAG paused: my_dag
DAG unpaused: my_dag
# DAG execution paused and resumed without deletion.
12. Backfilling DAG Runs ๐
Boilerplate Code:
airflow dags backfill <dag_id> -s <start_date> -e <end_date>
Use Case: Re-run missed DAG executions over a specific time range.
Goal: Backfill DAG runs that were missed or failed. ๐ฏ
Sample Code:
airflow dags backfill my_dag -s 2023-01-01 -e 2023-01-07
Before Example: You manually rerun missed tasks for specific dates by rescheduling or manually executing them.
# Manually run missed tasks:
python my_task.py --date 2023-01-01
After Example: Airflow automatically backfills missed runs for the specified dates.
Backfilling DAG: my_dag from 2023-01-01 to 2023-01-07
# DAG runs for the specified dates are backfilled successfully.
13. Triggering a Task from Another Task ๐
Boilerplate Code:
trigger_task = TriggerDagRunOperator(
task_id='trigger_task',
trigger_dag_id='other_dag',
dag=dag
)
Use Case: Trigger a different DAG as part of the execution of the current DAG.
Goal: Chain workflows by triggering one DAG from another. ๐ฏ
Sample Code:
trigger_task = TriggerDagRunOperator(
task_id='trigger_task',
trigger_dag_id='other_dag',
dag=dag
)
Before Example: You manually execute multiple workflows one after another without automated dependency management.
python workflow1.py
python workflow2.py
# Manually chaining workflows.
After Example: Airflow automatically triggers the second DAG as part of the first DAGโs workflow.
Task triggered DAG: other_dag
# The second DAG is triggered automatically after the first one completes.
14. Email Alerts for Task Failures ๐ง
Boilerplate Code:
default_args = {
'email': ['your_email@example.com'],
'email_on_failure': True
}
Use Case: Send an email notification if a task fails.
Goal: Automatically receive an email when a task fails. ๐ฏ
Sample Code:
default_args = {
'owner': 'airflow',
'email': ['your_email@example.com'],
'email_on_failure': True,
'start_date': datetime(2023, 1, 1),
}
dag = DAG('alert_dag', default_args=default_args)
Before Example: You manually monitor the logs to check if tasks failed, without any automatic alert system.
# Check logs manually to see if a task failed.
tail -f /var/logs/task.log
After Example: Airflow automatically sends an email notification if a task fails.
Task failed, sending email to: your_email@example.com
# You receive an email alert when a task fails.
15. Setting Task Priority Weight ๐๏ธ
Boilerplate Code:
task = DummyOperator(
task_id='high_priority_task',
priority_weight=10,
dag=dag
)
Use Case: Assign priority to specific tasks to determine execution order.
Goal: Prioritize certain tasks over others in a DAG. ๐ฏ
Sample Code:
task1 = DummyOperator(task_id='low_priority_task', priority_weight=5, dag=dag)
task2 = DummyOperator(task_id='high_priority_task', priority_weight=10, dag=dag)
Before Example: Tasks are executed in the order they are added, without any notion of priority.
# Manually manage task execution order.
python task1.py
python task2.py
After Example: Airflow automatically prioritizes tasks with higher weights, executing them first.
Executing high_priority_task (weight: 10) before low_priority_task (weight: 5)
# Tasks are executed in order of their priority weights.
16. Clearing Task Instances in a DAG ๐งน
Boilerplate Code:
airflow tasks clear <dag_id> --start-date <date> --end-date <date>
Use Case: Clear the status of tasks that have already been executed in a DAG.
Goal: Reset tasks so that they can be re-executed. ๐ฏ
Sample Code:
airflow tasks clear my_dag --start-date 2023-01-01 --end-date 2023-01-07
Before Example: You manually delete logs or reset status flags in the database for re-executing tasks.
# Manually clear task states by modifying logs:
rm -rf /var/logs/my_task/
After Example: Airflow clears the status of the selected task instances, making them ready for re-execution.
Clearing tasks from 2023-01-01 to 2023-01-07 for DAG: my_dag
# Tasks are cleared and ready for rerun.
17. Running Airflow in Sequential Executor Mode ๐ ๏ธ
Boilerplate Code:
airflow scheduler
Use Case: Run tasks one at a time in a lightweight environment.
Goal: Use Airflow's SequentialExecutor to run tasks in sequence with limited resources. ๐ฏ
Sample Code:
airflow scheduler
Before Example: You manage multiple tasks on different systems without centralized scheduling.
# Manually execute tasks one at a time:
python task1.py
python task2.py
After Example: Airflow schedules tasks in sequence, managing them in the correct order automatically.
Scheduler running with SequentialExecutor.
# Tasks are executed one after the other in a lightweight mode.
18. Using ExternalTaskSensor to Wait for External DAGs โณ
Boilerplate Code:
ExternalTaskSensor(
task_id='wait_for_external_dag',
external_dag_id='external_dag',
external_task_id='external_task',
timeout=600,
dag=dag
)
Use Case: Wait for a task in an external DAG to complete before running tasks in the current DAG.
Goal: Synchronize tasks across different DAGs. ๐ฏ
Sample Code:
wait_task = ExternalTaskSensor(
task_id='wait_for_external_task',
external_dag_id='external_dag',
external_task_id='external_task',
timeout=600,
dag=dag
)
Before Example: You manually monitor the status of external tasks and trigger dependent workflows yourself.
# Check manually if external tasks are complete:
python check_status.py
After Example: Airflow automatically waits for the external task to complete before executing the dependent task.
Waiting for external task to complete: external_task
# Dependent task runs only after the external task completes.
19. SubDAGs (Nested DAGs) ๐งฉ
Boilerplate Code:
subdag_task = SubDagOperator(
task_id='my_subdag',
subdag=create_subdag(),
dag=dag
)
Use Case: Break down complex workflows into smaller, reusable components (SubDAGs).
Goal: Create nested workflows that can be reused across different DAGs. ๐ฏ
Sample Code:
subdag_task = SubDagOperator(
task_id='my_subdag',
subdag=create_subdag('main_dag', 'my_subdag', default_args),
dag=dag
)
Before Example: You create large, complex workflows where all tasks are defined in a single file, making them harder to manage.
# All tasks in one big script:
python workflow.py
After Example: With SubDAGs, you can break the workflow into smaller pieces that can be managed and reused.
SubDAG created: my_subdag
# The complex workflow is split into more manageable, smaller workflows.
20. XComs for Task Communication ๐จ
Boilerplate Code:
task.xcom_push(key='message', value='Hello from task 1')
task.xcom_pull(task_ids='task1', key='message')
Use Case: Share data between tasks using XComs.
Goal: Pass information from one task to another in a DAG. ๐ฏ
Sample Code:
# In task 1
task1.xcom_push(key='message', value='Hello from task 1')
# In task 2
message = task2.xcom_pull(task_ids='task1', key='message')
Before Example: You manually pass data between tasks by writing it to files or databases.
# Save output to a file:
echo "Hello from task 1" > message.txt
# Read output in another script:
cat message.txt
After Example: With Airflow XComs, you can directly pass data between tasks without needing external storage.
XCom pushed: 'Hello from task 1'
XCom pulled: 'Hello from task 1'
# Data shared between tasks using Airflow XComs.
Subscribe to my newsletter
Read articles from Anix Lynch directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by