Tổng quan: Airflow + Spark Operator

KiloKilo
8 min read

Việc tích hợp Apache Airflow với Spark Operator cho phép bạn orchestration (điều phối) các workflow xử lý dữ liệu lớn một cách linh hoạt trong Kubernetes.

🧠 Tổng quan: Airflow + Spark Operator

  • Apache Airflow là một công cụ quản lý workflow, giúp bạn lập lịch, theo dõi và điều phối các công việc (tasks) theo định dạng DAG (Directed Acyclic Graph).

  • Spark Operator giúp bạn chạy các ứng dụng Apache Spark trên Kubernetes bằng cách tạo tài nguyên SparkApplication.

Mục tiêu là: Airflow sẽ tạo và giám sát tài nguyên SparkApplication để trigger job Spark trên Kubernetes.'

🔗 Cơ chế kết nối giữa Airflow và Spark Operator

1. Airflow tạo SparkApplication YAML

Bạn sẽ dùng một task trong DAG để tạo một YAML định nghĩa SparkApplication (hoặc dùng sẵn YAML template).

2. Airflow gọi Kubernetes API

Sử dụng một trong các cách sau:

  • PythonOperator + kubernetes.client (client SDK)

  • KubernetesPodOperator

  • SparkKubernetesOperator

  • Kubernetes API Call trực tiếp (tự cấu hình service account/token)

  • CustomOperator dành riêng cho Spark Operator (tự viết nếu cần)

3. Spark Operator tiếp nhận YAML

Khi SparkApplication được tạo trong namespace Kubernetes, Spark Operator sẽ:

  • Deploy Driver Pod

  • Driver Pod tạo Executor Pods (theo cấu hình YAML)

4. Airflow giám sát trạng thái SparkApplication

Bạn có thể giám sát bằng cách:

  • Truy vấn trạng thái từ CRD SparkApplication

  • Sử dụng sensor để poll trạng thái (status.applicationState.state == COMPLETED)

Ví dụ thực tế: Airflow DAG gọi Spark Operator (Ít dùng)

Cài đặt yêu cầu:

  • Airflow phải chạy trong cùng cluster (hoặc có quyền truy cập kubeconfig)

  • Spark Operator đã được cài đặt

  • Tạo service account để Airflow có thể tạo SparkApplication

DAG ví dụ (Python)

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import yaml
from kubernetes import client, config

def submit_spark_app(**kwargs):
    config.load_incluster_config()  # hoặc load_kube_config() nếu chạy local
    api = client.CustomObjectsApi()

    with open('/opt/airflow/dags/spark-pi.yaml') as f:
        body = yaml.safe_load(f)

    api.create_namespaced_custom_object(
        group="sparkoperator.k8s.io",
        version="v1beta2",
        namespace="default",
        plural="sparkapplications",
        body=body
    )

with DAG(
    'spark_operator_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
) as dag:

    submit_spark = PythonOperator(
        task_id='submit_spark_pi',
        python_callable=submit_spark_app
    )

File spark-pi.yaml (đặt trong /opt/airflow/dags/)

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi-from-airflow
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: gcr.io/spark-operator/spark:v3.3.0
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.3.0.jar
  sparkVersion: 3.3.0
  driver:
    cores: 1
    memory: 512m
    serviceAccount: spark
  executor:
    cores: 1
    instances: 2
    memory: 512m

🧩 Mở rộng thêm

  • Airflow XComs: Bạn có thể lấy trạng thái SparkApplication và truyền thông tin sang task kế tiếp.

  • 📦 Airflow Sensors: Viết custom sensor để chờ cho đến khi SparkApplication chuyển sang trạng thái COMPLETED hoặc FAILED.

  • 🔁 Retries và auto-recovery: Dùng các thông số như retries, retry_delay, và trigger_rule để quản lý lỗi khi job Spark fail.

Sự khác biệt và điểm tương đồng giữa SparkKubernetesOperator KubernetesPodOperator:

🧩 1. Điểm giống nhau

Tiêu chíSparkKubernetesOperatorKubernetesPodOperator
Chạy trong Kubernetes
Tạo Pod trong cluster
Dùng trong DAG của Airflow
Có thể truyền image, env, volume✅ (gián tiếp qua YAML SparkApplication)✅ (trực tiếp qua params)
Theo dõi trạng thái Pod✅ (qua trạng thái SparkApplication)✅ (trực tiếp qua trạng thái Pod)

🔍 2. Khác biệt chính

Tiêu chíSparkKubernetesOperatorKubernetesPodOperator
Mục đích chínhGửi và giám sát SparkApplication (CRD) thông qua Spark OperatorChạy bất kỳ container nào trong Pod
Tích hợp Spark tốt hơn✅ Có sẵn logic hiểu trạng thái SparkApplication❌ Không hiểu Spark, chỉ là container
Yêu cầu Spark Operator✅ Bắt buộc❌ Không cần
Cách chạy SparkSpark Operator sẽ tạo Driver và Executor Pods từ CRDBạn tự build image Spark và chạy lệnh spark-submit thủ công
Theo dõi job SparkTheo trạng thái CRD (RUNNING, COMPLETED, FAILED)Bạn phải log/parse thủ công từ stdout
Code phức tạpDễ dùng hơn khi đã có SparkOperator và YAML mẫuCần tự viết câu lệnh spark-submit đầy đủ, pass args, volumes, etc.
Quản lý lifecycle tốt hơn✅ SparkOperator tự quản lý toàn bộ vòng đời job❌ Chỉ quản lý lifecycle của chính Pod đó

💡 Tình huống nên dùng cái nào?

Nhu cầuNên dùng
Đã cài Spark OperatorSparkKubernetesOperator
Muốn chạy job Spark nhanh, không cần nhiều YAMLKubernetesPodOperator (dùng image Spark custom)
Chạy job không phải Spark (ETL nhỏ, bash, python, etc)KubernetesPodOperator
Cần log + monitor Spark job chuyên nghiệpSparkKubernetesOperator
Muốn re-use SparkApplication YAML cho nhiều jobSparkKubernetesOperator
Spark đang chạy trên hệ thống khác (EMR, Dataproc, etc)❌ Không dùng cái nào, nên dùng operator riêng cho backend đó

Ví dụ thực tế dùng SparkKubernetesOperator trong Airflow để chạy một job PySpark

🎯 Mục tiêu

  • Airflow DAG sẽ submit một PySpark application đến Spark Operator trên Kubernetes.

  • Job sẽ thực hiện một phép tính đơn giản (đếm số dòng trong file CSV hoặc in log).

  • Dùng SparkKubernetesOperator (phần của Airflow provider cncf.kubernetes).

🏗️ Kiến trúc tổng quan

          +------------+         +----------------------+         +-------------------+
          | Airflow DAG|  --->   | SparkKubernetesOperator| --->   | Spark Operator     |
          +------------+         +----------------------+         +-------------------+
                                                           ↘
                                                        +-----------+
                                                        | Driver Pod|
                                                        +-----------+
                                                             ↘
                                                         +----------+
                                                         |Executors |
                                                         +----------+

📁 Cấu trúc thư mục

dags/
├── pyspark_example.py      <-- DAG định nghĩa airflow
├── spark-py.yaml           <-- SparkApplication YAML
└── app/
    └── main.py             <-- PySpark script

🐍 main.py — PySpark Script

# dags/app/main.py
from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession.builder.appName("PySpark Airflow Job").getOrCreate()
    data = spark.range(1, 100)
    count = data.count()
    print(f"Total count: {count}")
    spark.stop()

📝 spark-py.yaml — SparkApplication YAML

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: pyspark-airflow-job
  namespace: default
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: your-repo/pyspark-app:latest  # <-- image đã build sẵn chứa main.py
  imagePullPolicy: Always
  mainApplicationFile: local:///app/main.py
  sparkVersion: "3.3.0"
  restartPolicy:
    type: Never
  driver:
    cores: 1
    memory: "512m"
    serviceAccount: spark  # service account có quyền
    labels:
      version: 3.3.0
  executor:
    cores: 1
    instances: 2
    memory: "512m"
    labels:
      version: 3.3.0

🔧 Lưu ý: Bạn cần build Docker image chứa thư mục /app/main.py. Ví dụ Dockerfile:

DockerfileCopyEditFROM gcr.io/spark-operator/spark-py:v3.3.0
COPY app /app

🪄 pyspark_example.py — Airflow DAG

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from datetime import datetime

with DAG(
    dag_id="pyspark_airflow_operator",
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
    tags=["spark", "pyspark", "k8s"],
) as dag:

    submit_spark_job = SparkKubernetesOperator(
        task_id="submit_pyspark_job",
        namespace="default",
        application_file="dags/spark-py.yaml",
        do_xcom_push=True
    )

🔍 Giải thích chuyên sâu

Thành phầnVai trò
SparkKubernetesOperatorGửi SparkApplication CRD đến Spark Operator
application_fileChứa cấu hình job (giống như spark-submit)
mainApplicationFileĐường dẫn đến file Python trong container
do_xcom_push=TrueCho phép DAG lấy trạng thái của job trả về

🧪 Cách chạy

  1. Build và push Docker image:

     docker build -t your-repo/pyspark-app:latest .
     docker push your-repo/pyspark-app:latest
    
  2. Khởi chạy DAG trong Airflow UI → pyspark_airflow_operator → Trigger DAG

  3. Truy cập Spark Operator UI hoặc kubectl get sparkapplications để kiểm tra trạng thái.

Ví dụ thực tế dùng KubernetesPodOperator để chạy PySpark trong một DAG của Apache Airflow, không dùng Spark Operator, phù hợp trong các tình huống :

  • chưa cài Spark Operator,

  • hoặc cần kiểm soát trực tiếp câu lệnh spark-submit,

  • hoặc muốn đơn giản hóa hệ thống.

🎯 Mục tiêu

  • Dùng KubernetesPodOperator để tạo một Pod chạy spark-submit trên Kubernetes cluster.

  • Job thực hiện thao tác PySpark đơn giản (in số dòng).

  • Airflow theo dõi trạng thái Pod như một task.

🧱 Yêu cầu

  • Bạn đã cài Airflow trên Kubernetes hoặc Local + có kubeconfig trỏ vào cluster.

  • Cluster có Spark được đóng gói sẵn trong image (ví dụ: gcr.io/spark-operator/spark-py:v3.3.0).

  • Có một container image chứa PySpark script.

🐍 PySpark Script – main.py

# main.py
from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession.builder.appName("pyspark_k8s").getOrCreate()
    df = spark.range(1, 100)
    print(f"Row count = {df.count()}")
    spark.stop()

🐳 Dockerfile – để tạo image Spark + script

# Dockerfile
FROM gcr.io/spark-operator/spark-py:v3.3.0
COPY main.py /opt/spark/work-dir/main.py

Build & push:

docker build -t your-registry/pyspark-pod:latest .
docker push your-registry/pyspark-pod:latest

🪄 Airflow DAG dùng KubernetesPodOperator

pythonCopyEditfrom airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

with DAG(
    dag_id="pyspark_k8s_pod_operator",
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
    catchup=False,
    tags=["spark", "k8s", "pod-operator"]
) as dag:

    run_spark = KubernetesPodOperator(
        task_id="run_pyspark",
        name="spark-job",
        namespace="default",
        image="your-registry/pyspark-pod:latest",
        cmds=["/opt/spark/bin/spark-submit"],
        arguments=[
            "--master", "k8s://https://kubernetes.default.svc",
            "--deploy-mode", "client",  # Hoặc "cluster" nếu spark driver nằm trong Pod
            "--conf", "spark.kubernetes.container.image=your-registry/pyspark-pod:latest",
            "local:///opt/spark/work-dir/main.py"
        ],
        get_logs=True,
        is_delete_operator_pod=True,
    )

🧠 Giải thích chi tiết

Thành phầnMục đích
imageImage chứa Spark + main.py
cmds + argumentsLà câu lệnh spark-submit như bạn hay chạy trên terminal
--masterTrỏ đến Kubernetes API (k8s://...)
--deploy-mode"cluster" thì driver nằm trong pod Spark, "client" thì driver nằm trong pod chạy spark-submit
main.pyPySpark script nằm sẵn trong container
is_delete_operator_pod=TrueTự động xóa pod sau khi xong
get_logs=TrueLấy log PySpark về Airflow task log

✅ Kết quả

  • Khi bạn trigger DAG, Airflow sẽ tạo một Pod chạy lệnh spark-submit.

  • Pod sẽ sinh thêm các executor pod nếu là --deploy-mode cluster.

  • Log kết quả sẽ xuất hiện trong phần log của task Airflow.

  • Không cần Spark Operator, đơn giản hoá quản trị.

0
Subscribe to my newsletter

Read articles from Kilo directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Kilo
Kilo