Hướng Dẫn Tích Hợp Apache Airflow Với Spark Operator Trên Kubernetes

KiloKilo
4 min read

Apache Airflow là một công cụ orchestration mạnh mẽ cho phép bạn định nghĩa các luồng công việc phức tạp bằng Python. Apache Spark lại là một nền tảng xử lý dữ liệu lớn phổ biến. Khi cả hai được triển khai cùng trên Kubernetes, bạn có thể thực hiện các job xử lý dữ liệu phân tán ngay trong pipeline của Airflow một cách linh hoạt, tự động và có khả năng mở rộng cao.

Trong bài viết này, mình sẽ hướng dẫn bạn cách:

  • Cài đặt Spark Operator trên Kubernetes

  • Tạo một job Spark dưới dạng SparkApplication

  • Gọi Spark job từ Airflow thông qua KubernetesPodOperator


Kiến trúc tổng thể

Khi tích hợp Airflow và Spark Operator, chúng ta không dùng SparkSubmitOperator truyền thống (yêu cầu cài đặt Spark bên trong Airflow), mà dùng Spark Operator để tạo và theo dõi Spark job dưới dạng CRD (Custom Resource Definition).

Mô hình hoạt động:

+------------------+     kubectl apply SparkApplication     +--------------------+
|     Airflow      | ------------------------------------> |   Spark Operator    |
| (Kubernetes DAG) |                                        | (Custom Controller) |
+------------------+                                        +----------+---------+
                                                                     |
                                                                     v
                                                        +----------------------------+
                                                        |  Driver & Executor Pods     |
                                                        |  Sinh ra & quản lý tự động |
                                                        +----------------------------+

Bước 1: Cài đặt Spark Operator

Sử dụng Helm:

helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm repo update

helm install spark-operator spark-operator/spark-operator \
  --namespace spark-operator --create-namespace \
  --set sparkJobNamespace=default \
  --set webhook.enable=true \
  --set serviceAccounts.spark.name=spark

Bước 2: Tạo ServiceAccount spark

Spark Operator yêu cầu một ServiceAccount có quyền tạo Pod:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: spark
  namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: spark-role-binding
  namespace: default
roleRef:
  kind: ClusterRole
  name: spark-operator-role
  apiGroup: rbac.authorization.k8s.io
subjects:
  - kind: ServiceAccount
    name: spark
    namespace: default

Apply:

kubectl apply -f spark-serviceaccount.yaml

Bước 3: Tạo Spark job dưới dạng YAML

File spark_pi.yaml:

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: default
spec:
  type: Python
  mode: cluster
  image: gcr.io/spark-operator/spark-py:v3.3.0
  imagePullPolicy: Always
  mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
  sparkVersion: "3.3.0"
  restartPolicy:
    type: Never
  driver:
    cores: 1
    memory: "512m"
    serviceAccount: spark
  executor:
    cores: 1
    instances: 2
    memory: "512m"

File spark_pi.yaml là một tài nguyên Custom Resource (CRD) loại SparkApplication được Spark Operator định nghĩa và xử lý trong Kubernetes. Mục đích chính của file này là mô tả đầy đủ một Spark job mà Spark Operator sẽ thực thi.

File spark_pi.yaml có nhiệm vụ:

✅ Định nghĩa Spark job mà bạn muốn chạy
✅ Cho biết Spark dùng image nào, chạy file gì, dùng bao nhiêu tài nguyên
✅ Tự động triển khai và theo dõi job trên Kubernetes thông qua Spark Operator

Bước 4: Airflow DAG gọi Spark job

Tạo file DAG tên submit_spark_pi.py:

from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

with DAG("submit_spark_pi",
         start_date=datetime(2024, 1, 1),
         schedule_interval=None,
         catchup=False) as dag:

    submit_spark_job = KubernetesPodOperator(
        task_id="submit_spark_pi_job",
        name="apply-spark-pi",
        namespace="default",
        image="bitnami/kubectl:latest",
        cmds=["kubectl", "apply", "-f", "/opt/airflow/spark_pi.yaml"],
        volume_mounts=[{
            "name": "dag-volume",
            "mountPath": "/opt/airflow"
        }],
        volumes=[{
            "name": "dag-volume",
            "hostPath": {
                "path": "/path/tới/thư/mục/dags",  # sửa lại cho đúng
                "type": "Directory"
            }
        }],
        get_logs=True,
        is_delete_operator_pod=True,
        in_cluster=True
    )

File submit_spark_pi.py là một Airflow DAG file (Directed Acyclic Graph) – tức là một định nghĩa bằng mã Python về luồng công việc (workflow) trong Apache Airflow. File này quy định cách mà Airflow sẽ submit một Spark job lên Kubernetes thông qua Spark Operator.

Nhiệm vụ chính của file này là:

✅ Tạo một DAG (luồng công việc) trong Airflow
✅ Dùng KubernetesPodOperator để khởi chạy một Pod mới
✅ Pod đó sẽ chạy lệnh kubectl apply -f spark_pi.yaml → gửi Spark job tới Spark Operator
✅ Giúp tự động hóa việc chạy Spark job ngay trong pipeline Airflow

Bước 5: Kích hoạt và giám sát

  1. Truy cập UI Airflow (http://<airflow-host>:8080)

  2. Trigger DAG submit_spark_pi

  3. Quan sát log task

  4. Kiểm tra Spark job:

kubectl get sparkapplication spark-pi
kubectl logs -f <driver-pod-name>

Bước 6: (Tuỳ chọn) Theo dõi trạng thái job từ DAG

Thêm một task dùng KubernetesPodOperator:

kubectl get sparkapplication spark-pi -o jsonpath='{.status.applicationState.state}'

Bạn có thể dùng để báo cáo job đã COMPLETED, RUNNING hoặc FAILED.

0
Subscribe to my newsletter

Read articles from Kilo directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Kilo
Kilo