Tổng quan: Airflow + Spark Operator

Việc tích hợp Apache Airflow với Spark Operator cho phép bạn orchestration (điều phối) các workflow xử lý dữ liệu lớn một cách linh hoạt trong Kubernetes.
🧠 Tổng quan: Airflow + Spark Operator
Apache Airflow là một công cụ quản lý workflow, giúp bạn lập lịch, theo dõi và điều phối các công việc (tasks) theo định dạng DAG (Directed Acyclic Graph).
Spark Operator giúp bạn chạy các ứng dụng Apache Spark trên Kubernetes bằng cách tạo tài nguyên
SparkApplication
.
Mục tiêu là: Airflow sẽ tạo và giám sát tài nguyên SparkApplication
để trigger job Spark trên Kubernetes.'
🔗 Cơ chế kết nối giữa Airflow và Spark Operator
1. Airflow tạo SparkApplication
YAML
Bạn sẽ dùng một task trong DAG để tạo một YAML định nghĩa SparkApplication (hoặc dùng sẵn YAML template).
2. Airflow gọi Kubernetes API
Sử dụng một trong các cách sau:
PythonOperator +
kubernetes.client
(client SDK)KubernetesPodOperator
SparkKubernetesOperator
Kubernetes API Call trực tiếp (tự cấu hình service account/token)
CustomOperator dành riêng cho Spark Operator (tự viết nếu cần)
3. Spark Operator tiếp nhận YAML
Khi SparkApplication
được tạo trong namespace Kubernetes, Spark Operator sẽ:
Deploy Driver Pod
Driver Pod tạo Executor Pods (theo cấu hình YAML)
4. Airflow giám sát trạng thái SparkApplication
Bạn có thể giám sát bằng cách:
Truy vấn trạng thái từ CRD
SparkApplication
Sử dụng sensor để poll trạng thái (
status.applicationState.state == COMPLETED
)
Ví dụ thực tế: Airflow DAG gọi Spark Operator (Ít dùng)
Cài đặt yêu cầu:
Airflow phải chạy trong cùng cluster (hoặc có quyền truy cập
kubeconfig
)Spark Operator đã được cài đặt
Tạo service account để Airflow có thể tạo SparkApplication
DAG ví dụ (Python)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import yaml
from kubernetes import client, config
def submit_spark_app(**kwargs):
config.load_incluster_config() # hoặc load_kube_config() nếu chạy local
api = client.CustomObjectsApi()
with open('/opt/airflow/dags/spark-pi.yaml') as f:
body = yaml.safe_load(f)
api.create_namespaced_custom_object(
group="sparkoperator.k8s.io",
version="v1beta2",
namespace="default",
plural="sparkapplications",
body=body
)
with DAG(
'spark_operator_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
) as dag:
submit_spark = PythonOperator(
task_id='submit_spark_pi',
python_callable=submit_spark_app
)
File spark-pi.yaml
(đặt trong /opt/airflow/dags/
)
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi-from-airflow
namespace: default
spec:
type: Scala
mode: cluster
image: gcr.io/spark-operator/spark:v3.3.0
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.3.0.jar
sparkVersion: 3.3.0
driver:
cores: 1
memory: 512m
serviceAccount: spark
executor:
cores: 1
instances: 2
memory: 512m
🧩 Mở rộng thêm
✅ Airflow XComs: Bạn có thể lấy trạng thái
SparkApplication
và truyền thông tin sang task kế tiếp.📦 Airflow Sensors: Viết custom sensor để chờ cho đến khi SparkApplication chuyển sang trạng thái
COMPLETED
hoặcFAILED
.🔁 Retries và auto-recovery: Dùng các thông số như
retries
,retry_delay
, vàtrigger_rule
để quản lý lỗi khi job Spark fail.
Sự khác biệt và điểm tương đồng giữa SparkKubernetesOperator
và KubernetesPodOperator
:
🧩 1. Điểm giống nhau
Tiêu chí | SparkKubernetesOperator | KubernetesPodOperator |
Chạy trong Kubernetes | ✅ | ✅ |
Tạo Pod trong cluster | ✅ | ✅ |
Dùng trong DAG của Airflow | ✅ | ✅ |
Có thể truyền image, env, volume | ✅ (gián tiếp qua YAML SparkApplication) | ✅ (trực tiếp qua params) |
Theo dõi trạng thái Pod | ✅ (qua trạng thái SparkApplication) | ✅ (trực tiếp qua trạng thái Pod) |
🔍 2. Khác biệt chính
Tiêu chí | SparkKubernetesOperator | KubernetesPodOperator |
Mục đích chính | Gửi và giám sát SparkApplication (CRD) thông qua Spark Operator | Chạy bất kỳ container nào trong Pod |
Tích hợp Spark tốt hơn | ✅ Có sẵn logic hiểu trạng thái SparkApplication | ❌ Không hiểu Spark, chỉ là container |
Yêu cầu Spark Operator | ✅ Bắt buộc | ❌ Không cần |
Cách chạy Spark | Spark Operator sẽ tạo Driver và Executor Pods từ CRD | Bạn tự build image Spark và chạy lệnh spark-submit thủ công |
Theo dõi job Spark | Theo trạng thái CRD (RUNNING , COMPLETED , FAILED ) | Bạn phải log/parse thủ công từ stdout |
Code phức tạp | Dễ dùng hơn khi đã có SparkOperator và YAML mẫu | Cần tự viết câu lệnh spark-submit đầy đủ, pass args, volumes, etc. |
Quản lý lifecycle tốt hơn | ✅ SparkOperator tự quản lý toàn bộ vòng đời job | ❌ Chỉ quản lý lifecycle của chính Pod đó |
💡 Tình huống nên dùng cái nào?
Nhu cầu | Nên dùng |
Đã cài Spark Operator | SparkKubernetesOperator |
Muốn chạy job Spark nhanh, không cần nhiều YAML | KubernetesPodOperator (dùng image Spark custom) |
Chạy job không phải Spark (ETL nhỏ, bash, python, etc) | KubernetesPodOperator |
Cần log + monitor Spark job chuyên nghiệp | SparkKubernetesOperator |
Muốn re-use SparkApplication YAML cho nhiều job | SparkKubernetesOperator |
Spark đang chạy trên hệ thống khác (EMR, Dataproc, etc) | ❌ Không dùng cái nào, nên dùng operator riêng cho backend đó |
Ví dụ thực tế dùng SparkKubernetesOperator
trong Airflow để chạy một job PySpark
🎯 Mục tiêu
Airflow DAG sẽ submit một PySpark application đến Spark Operator trên Kubernetes.
Job sẽ thực hiện một phép tính đơn giản (đếm số dòng trong file CSV hoặc in log).
Dùng
SparkKubernetesOperator
(phần của Airflow providercncf.kubernetes
).
🏗️ Kiến trúc tổng quan
+------------+ +----------------------+ +-------------------+
| Airflow DAG| ---> | SparkKubernetesOperator| ---> | Spark Operator |
+------------+ +----------------------+ +-------------------+
↘
+-----------+
| Driver Pod|
+-----------+
↘
+----------+
|Executors |
+----------+
📁 Cấu trúc thư mục
dags/
├── pyspark_example.py <-- DAG định nghĩa airflow
├── spark-py.yaml <-- SparkApplication YAML
└── app/
└── main.py <-- PySpark script
🐍 main.py
— PySpark Script
# dags/app/main.py
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession.builder.appName("PySpark Airflow Job").getOrCreate()
data = spark.range(1, 100)
count = data.count()
print(f"Total count: {count}")
spark.stop()
📝 spark-py.yaml
— SparkApplication YAML
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: pyspark-airflow-job
namespace: default
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: your-repo/pyspark-app:latest # <-- image đã build sẵn chứa main.py
imagePullPolicy: Always
mainApplicationFile: local:///app/main.py
sparkVersion: "3.3.0"
restartPolicy:
type: Never
driver:
cores: 1
memory: "512m"
serviceAccount: spark # service account có quyền
labels:
version: 3.3.0
executor:
cores: 1
instances: 2
memory: "512m"
labels:
version: 3.3.0
🔧 Lưu ý: Bạn cần build Docker image chứa thư mục
/app/
main.py
. Ví dụDockerfile
:
DockerfileCopyEditFROM gcr.io/spark-operator/spark-py:v3.3.0
COPY app /app
🪄 pyspark_
example.py
— Airflow DAG
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from datetime import datetime
with DAG(
dag_id="pyspark_airflow_operator",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
tags=["spark", "pyspark", "k8s"],
) as dag:
submit_spark_job = SparkKubernetesOperator(
task_id="submit_pyspark_job",
namespace="default",
application_file="dags/spark-py.yaml",
do_xcom_push=True
)
🔍 Giải thích chuyên sâu
Thành phần | Vai trò |
SparkKubernetesOperator | Gửi SparkApplication CRD đến Spark Operator |
application_file | Chứa cấu hình job (giống như spark-submit) |
mainApplicationFile | Đường dẫn đến file Python trong container |
do_xcom_push=True | Cho phép DAG lấy trạng thái của job trả về |
🧪 Cách chạy
Build và push Docker image:
docker build -t your-repo/pyspark-app:latest . docker push your-repo/pyspark-app:latest
Khởi chạy DAG trong Airflow UI →
pyspark_airflow_operator
→ Trigger DAGTruy cập Spark Operator UI hoặc
kubectl get sparkapplications
để kiểm tra trạng thái.
Ví dụ thực tế dùng KubernetesPodOperator
để chạy PySpark trong một DAG của Apache Airflow, không dùng Spark Operator, phù hợp trong các tình huống :
chưa cài Spark Operator,
hoặc cần kiểm soát trực tiếp câu lệnh
spark-submit
,hoặc muốn đơn giản hóa hệ thống.
🎯 Mục tiêu
Dùng
KubernetesPodOperator
để tạo một Pod chạyspark-submit
trên Kubernetes cluster.Job thực hiện thao tác PySpark đơn giản (in số dòng).
Airflow theo dõi trạng thái Pod như một task.
🧱 Yêu cầu
Bạn đã cài Airflow trên Kubernetes hoặc Local + có kubeconfig trỏ vào cluster.
Cluster có Spark được đóng gói sẵn trong image (ví dụ:
gcr.io/spark-operator/spark-py:v3.3.0
).Có một container image chứa PySpark script.
🐍 PySpark Script – main.py
# main.py
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession.builder.appName("pyspark_k8s").getOrCreate()
df = spark.range(1, 100)
print(f"Row count = {df.count()}")
spark.stop()
🐳 Dockerfile – để tạo image Spark + script
# Dockerfile
FROM gcr.io/spark-operator/spark-py:v3.3.0
COPY main.py /opt/spark/work-dir/main.py
Build & push:
docker build -t your-registry/pyspark-pod:latest .
docker push your-registry/pyspark-pod:latest
🪄 Airflow DAG dùng KubernetesPodOperator
pythonCopyEditfrom airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
with DAG(
dag_id="pyspark_k8s_pod_operator",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False,
tags=["spark", "k8s", "pod-operator"]
) as dag:
run_spark = KubernetesPodOperator(
task_id="run_pyspark",
name="spark-job",
namespace="default",
image="your-registry/pyspark-pod:latest",
cmds=["/opt/spark/bin/spark-submit"],
arguments=[
"--master", "k8s://https://kubernetes.default.svc",
"--deploy-mode", "client", # Hoặc "cluster" nếu spark driver nằm trong Pod
"--conf", "spark.kubernetes.container.image=your-registry/pyspark-pod:latest",
"local:///opt/spark/work-dir/main.py"
],
get_logs=True,
is_delete_operator_pod=True,
)
🧠 Giải thích chi tiết
Thành phần | Mục đích |
image | Image chứa Spark + main.py |
cmds + arguments | Là câu lệnh spark-submit như bạn hay chạy trên terminal |
--master | Trỏ đến Kubernetes API (k8s://... ) |
--deploy-mode | "cluster" thì driver nằm trong pod Spark, "client" thì driver nằm trong pod chạy spark-submit |
main.py | PySpark script nằm sẵn trong container |
is_delete_operator_pod=True | Tự động xóa pod sau khi xong |
get_logs=True | Lấy log PySpark về Airflow task log |
✅ Kết quả
Khi bạn trigger DAG, Airflow sẽ tạo một Pod chạy lệnh
spark-submit
.Pod sẽ sinh thêm các executor pod nếu là
--deploy-mode cluster
.Log kết quả sẽ xuất hiện trong phần log của task Airflow.
Không cần Spark Operator, đơn giản hoá quản trị.
Subscribe to my newsletter
Read articles from Kilo directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
