Hướng Dẫn Tích Hợp Apache Airflow Với Spark Operator Trên Kubernetes

Apache Airflow là một công cụ orchestration mạnh mẽ cho phép bạn định nghĩa các luồng công việc phức tạp bằng Python. Apache Spark lại là một nền tảng xử lý dữ liệu lớn phổ biến. Khi cả hai được triển khai cùng trên Kubernetes, bạn có thể thực hiện các job xử lý dữ liệu phân tán ngay trong pipeline của Airflow một cách linh hoạt, tự động và có khả năng mở rộng cao.
Trong bài viết này, mình sẽ hướng dẫn bạn cách:
Cài đặt Spark Operator trên Kubernetes
Tạo một job Spark dưới dạng
SparkApplication
Gọi Spark job từ Airflow thông qua
KubernetesPodOperator
Kiến trúc tổng thể
Khi tích hợp Airflow và Spark Operator, chúng ta không dùng SparkSubmitOperator
truyền thống (yêu cầu cài đặt Spark bên trong Airflow), mà dùng Spark Operator để tạo và theo dõi Spark job dưới dạng CRD (Custom Resource Definition).
Mô hình hoạt động:
+------------------+ kubectl apply SparkApplication +--------------------+
| Airflow | ------------------------------------> | Spark Operator |
| (Kubernetes DAG) | | (Custom Controller) |
+------------------+ +----------+---------+
|
v
+----------------------------+
| Driver & Executor Pods |
| Sinh ra & quản lý tự động |
+----------------------------+
Bước 1: Cài đặt Spark Operator
Sử dụng Helm:
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm repo update
helm install spark-operator spark-operator/spark-operator \
--namespace spark-operator --create-namespace \
--set sparkJobNamespace=default \
--set webhook.enable=true \
--set serviceAccounts.spark.name=spark
Bước 2: Tạo ServiceAccount spark
Spark Operator yêu cầu một ServiceAccount có quyền tạo Pod:
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-role-binding
namespace: default
roleRef:
kind: ClusterRole
name: spark-operator-role
apiGroup: rbac.authorization.k8s.io
subjects:
- kind: ServiceAccount
name: spark
namespace: default
Apply:
kubectl apply -f spark-serviceaccount.yaml
Bước 3: Tạo Spark job dưới dạng YAML
File spark_pi.yaml
:
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
spec:
type: Python
mode: cluster
image: gcr.io/spark-operator/spark-py:v3.3.0
imagePullPolicy: Always
mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
sparkVersion: "3.3.0"
restartPolicy:
type: Never
driver:
cores: 1
memory: "512m"
serviceAccount: spark
executor:
cores: 1
instances: 2
memory: "512m"
File spark_pi.yaml
là một tài nguyên Custom Resource (CRD) loại SparkApplication
được Spark Operator định nghĩa và xử lý trong Kubernetes. Mục đích chính của file này là mô tả đầy đủ một Spark job mà Spark Operator sẽ thực thi.
File spark_pi.yaml
có nhiệm vụ:
✅ Định nghĩa Spark job mà bạn muốn chạy
✅ Cho biết Spark dùng image nào, chạy file gì, dùng bao nhiêu tài nguyên
✅ Tự động triển khai và theo dõi job trên Kubernetes thông qua Spark Operator
Bước 4: Airflow DAG gọi Spark job
Tạo file DAG tên submit_spark_
pi.py
:
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
with DAG("submit_spark_pi",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False) as dag:
submit_spark_job = KubernetesPodOperator(
task_id="submit_spark_pi_job",
name="apply-spark-pi",
namespace="default",
image="bitnami/kubectl:latest",
cmds=["kubectl", "apply", "-f", "/opt/airflow/spark_pi.yaml"],
volume_mounts=[{
"name": "dag-volume",
"mountPath": "/opt/airflow"
}],
volumes=[{
"name": "dag-volume",
"hostPath": {
"path": "/path/tới/thư/mục/dags", # sửa lại cho đúng
"type": "Directory"
}
}],
get_logs=True,
is_delete_operator_pod=True,
in_cluster=True
)
File submit_spark_
pi.py
là một Airflow DAG file (Directed Acyclic Graph) – tức là một định nghĩa bằng mã Python về luồng công việc (workflow) trong Apache Airflow. File này quy định cách mà Airflow sẽ submit một Spark job lên Kubernetes thông qua Spark Operator.
Nhiệm vụ chính của file này là:
✅ Tạo một DAG (luồng công việc) trong Airflow
✅ Dùng KubernetesPodOperator
để khởi chạy một Pod mới
✅ Pod đó sẽ chạy lệnh kubectl apply -f spark_pi.yaml
→ gửi Spark job tới Spark Operator
✅ Giúp tự động hóa việc chạy Spark job ngay trong pipeline Airflow
Bước 5: Kích hoạt và giám sát
Truy cập UI Airflow (
http://<airflow-host>:8080
)Trigger DAG
submit_spark_pi
Quan sát log task
Kiểm tra Spark job:
kubectl get sparkapplication spark-pi
kubectl logs -f <driver-pod-name>
Bước 6: (Tuỳ chọn) Theo dõi trạng thái job từ DAG
Thêm một task dùng KubernetesPodOperator
:
kubectl get sparkapplication spark-pi -o jsonpath='{.status.applicationState.state}'
Bạn có thể dùng để báo cáo job đã COMPLETED, RUNNING hoặc FAILED.
Subscribe to my newsletter
Read articles from Kilo directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
