Apache Airflow - Operators

samyak jainsamyak jain
3 min read

Apache Airflow provides a variety of built-in operators that you can use to create tasks within your Directed Acyclic Graphs (DAGs).

Commonly Used Operators -

  1. DummyOperator - It doesn't execute any task, but it can be useful to mark certain points within your workflow. It acts as a placeholder operator.

    Params & Definition - DummyOperator

  2. BashOperator - It is used to execute a bash command.

    Params & Definition - BashOperator

  3. PythonOperator - It is used to run the python callable or python function.

    Params & Definition - PythonOperator

  4. EmailOperator - It is used to send emails to the receiver.

    Params & Definition - EmailOperator

  5. MySqlOperator - It is used to run the SQL query for MySql Database.

    Params & Definition - MySqlOperator

  6. S3ToHiveOperator - It transfers data from Amazon S3 to Hive.

    Params & Definition - S3ToHiveOperator

Lets get Coding -

  1. DummyOperator -

     from airflow import DAG
     from airflow.operators.dummy import DummyOperator
     from datetime import datetime
    
     default_args = {
         'owner': 'airflow',
         'depends_on_past': False,
         'start_date': datetime(2022, 1, 1)
     }
    
     dag = DAG('dummy_operator_example_dag', default_args=default_args, schedule_interval='@daily')
    
     task_1 = DummyOperator(task_id='task_1', dag=dag)
     task_2 = DummyOperator(task_id='task_2', dag=dag)
    
     task_1 >> task_2
    
  2. BashOperator -

     from airflow import DAG
     from airflow.operators.bash_operator import BashOperator
     from datetime import datetime
    
     default_args = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
    
     dag = DAG('bash_operator_example_dag', 
               default_args=default_args, schedule_interval='@daily')
    
     task1 = BashOperator(
         task_id='task1',
         bash_command='echo "Hello Bash Operator!"',
         dag=dag
     )
    
  3. PythonOperator -

    ```python from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.empty import EmptyOperator from pathlib import Path

def our_python_function(): print("You can write your python function logic here")

Default parameters for the workflow

default_args = { 'depends_on_past': False, 'owner': 'airflow', 'start_date': datetime(2021, 1, 1) }

dag = DAG('dummy_operator_example_dag', default_args=default_args, schedule_interval='@daily')

task1 = BashOperator( task_id='task1', python_callable=our_python_function, dag=dag )


4. **EmailOperator -**

    Before using the `EmailOperator`, ensure that your Airflow configuration defines the SMTP server details:

    ```bash
    [email]
    smtp_host = smtp.example.com
    smtp_starttls = True
    smtp_ssl = False
    smtp_user = user@example.com
    smtp_password = password
    smtp_port = 25
    smtp_mail_from = airflow@example.com
    from airflow.operators.email import EmailOperator

    send_email = EmailOperator(
        task_id='send_email',
        to='user@example.com',
        subject='Airflow Alert',
        html_content='<p>Your Airflow job has finished.</p>'
    )
  1. MySqlOperator -

     from datetime import datetime
     from airflow import DAG
     from airflow.operators.mysql_operator import MySqlOperator
     import logging
     import mysql.connector
     mydb = mysql.connector.connect(
     host=”localhost”,
     user=”(enter)”,
     passwd=”(enter)",
     database=”(enter)”
     )
    
     default_args = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
    
     dag = DAG('mysql_operator_example_dag', 
               default_args=default_args, schedule_interval='@daily')
    
     task1 = MySqlOperator(
         task_id='task1',
         sql='''CREATE TABLE employee( emp_id integer NOT NULL, emp_name VARCHAR (50) NOT NULL, emp_title VARCHAR(50));''',
     )
     task2 = MySqlOperator(
         task_id='task2',
         sql='INSERT INTO employee (emp_id,emp_name,emp_title) VALUES(%s, %s)',
         parameters=(1,"abc","HR")
     )
     task1 >> task2
    
  2. HttpSensor -

     from airflow import DAG
     from airflow.sensors.http_sensor import HttpSensor
     from datetime import datetime
    
     default_args = {'owner': 'airflow', 'start_date': datetime(2024, 1, 1)}
    
     dag = DAG('http_sensor_example_dag', default_args=default_args, schedule_interval='@daily')
    
     http_sensor_task = HttpSensor(
         task_id='http_sensor_task',
         http_conn_id='http_conn_id',
         endpoint='http://example.com/health',
         poke_interval=60,
         timeout=120,
         dag=dag
     )
    
0
Subscribe to my newsletter

Read articles from samyak jain directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

samyak jain
samyak jain

Hi there, I'm Samyak Jain , a seasoned data & analytics professional with problem solving mindset, passionate to solve challenging real world problems using data and technology.