Apache Airflow - Operators

Apache Airflow provides a variety of built-in operators that you can use to create tasks within your Directed Acyclic Graphs (DAGs).
Commonly Used Operators -
DummyOperator - It doesn't execute any task, but it can be useful to mark certain points within your workflow. It acts as a placeholder operator.
Params & Definition - DummyOperator
BashOperator - It is used to execute a bash command.
Params & Definition - BashOperator
PythonOperator - It is used to run the python callable or python function.
Params & Definition - PythonOperator
EmailOperator - It is used to send emails to the receiver.
Params & Definition - EmailOperator
MySqlOperator - It is used to run the SQL query for MySql Database.
Params & Definition - MySqlOperator
S3ToHiveOperator - It transfers data from Amazon S3 to Hive.
Params & Definition - S3ToHiveOperator
Lets get Coding -
DummyOperator -
from airflow import DAG from airflow.operators.dummy import DummyOperator from datetime import datetime default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2022, 1, 1) } dag = DAG('dummy_operator_example_dag', default_args=default_args, schedule_interval='@daily') task_1 = DummyOperator(task_id='task_1', dag=dag) task_2 = DummyOperator(task_id='task_2', dag=dag) task_1 >> task_2
BashOperator -
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime default_args = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)} dag = DAG('bash_operator_example_dag', default_args=default_args, schedule_interval='@daily') task1 = BashOperator( task_id='task1', bash_command='echo "Hello Bash Operator!"', dag=dag )
PythonOperator -
```python from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.empty import EmptyOperator from pathlib import Path
def our_python_function(): print("You can write your python function logic here")
Default parameters for the workflow
default_args = { 'depends_on_past': False, 'owner': 'airflow', 'start_date': datetime(2021, 1, 1) }
dag = DAG('dummy_operator_example_dag', default_args=default_args, schedule_interval='@daily')
task1 = BashOperator( task_id='task1', python_callable=our_python_function, dag=dag )
4. **EmailOperator -**
Before using the `EmailOperator`, ensure that your Airflow configuration defines the SMTP server details:
```bash
[email]
smtp_host = smtp.example.com
smtp_starttls = True
smtp_ssl = False
smtp_user = user@example.com
smtp_password = password
smtp_port = 25
smtp_mail_from = airflow@example.com
from airflow.operators.email import EmailOperator
send_email = EmailOperator(
task_id='send_email',
to='user@example.com',
subject='Airflow Alert',
html_content='<p>Your Airflow job has finished.</p>'
)
MySqlOperator -
from datetime import datetime from airflow import DAG from airflow.operators.mysql_operator import MySqlOperator import logging import mysql.connector mydb = mysql.connector.connect( host=”localhost”, user=”(enter)”, passwd=”(enter)", database=”(enter)” ) default_args = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)} dag = DAG('mysql_operator_example_dag', default_args=default_args, schedule_interval='@daily') task1 = MySqlOperator( task_id='task1', sql='''CREATE TABLE employee( emp_id integer NOT NULL, emp_name VARCHAR (50) NOT NULL, emp_title VARCHAR(50));''', ) task2 = MySqlOperator( task_id='task2', sql='INSERT INTO employee (emp_id,emp_name,emp_title) VALUES(%s, %s)', parameters=(1,"abc","HR") ) task1 >> task2
HttpSensor -
from airflow import DAG from airflow.sensors.http_sensor import HttpSensor from datetime import datetime default_args = {'owner': 'airflow', 'start_date': datetime(2024, 1, 1)} dag = DAG('http_sensor_example_dag', default_args=default_args, schedule_interval='@daily') http_sensor_task = HttpSensor( task_id='http_sensor_task', http_conn_id='http_conn_id', endpoint='http://example.com/health', poke_interval=60, timeout=120, dag=dag )
Subscribe to my newsletter
Read articles from samyak jain directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

samyak jain
samyak jain
Hi there, I'm Samyak Jain , a seasoned data & analytics professional with problem solving mindset, passionate to solve challenging real world problems using data and technology.