Tối ưu hóa việc chạy Spark Jobs trong Airflow với một Template SparkApplication

Apache Airflow là một công cụ mạnh mẽ để lập lịch và quản lý các luồng công việc, trong khi Apache Spark là nền tảng lý tưởng cho xử lý dữ liệu lớn. Khi triển khai Spark trên Kubernetes, việc tích hợp Airflow với Spark thông qua SparkKubernetesOperator mang lại sự linh hoạt và khả năng mở rộng. Tuy nhiên, nếu bạn có nhiều DAG cần chạy các Spark job khác nhau, việc lặp lại cấu hình SparkApplication có thể gây tốn thời gian và dễ xảy ra lỗi. Trong bài blog này, chúng ta sẽ khám phá cách sử dụng một file template SparkApplication duy nhất để chạy các Spark job trong Airflow, giúp tiết kiệm thời gian và tăng tính tái sử dụng.
Vấn đề: Quản lý nhiều SparkApplication trong Airflow
Khi triển khai nhiều Spark job trên Kubernetes thông qua Airflow, mỗi DAG thường yêu cầu một cấu hình SparkApplication riêng. Mặc dù các cấu hình này có nhiều điểm tương đồng (như image, driver/executor specs), việc sao chép và chỉnh sửa thủ công dẫn đến:
Lặp lại code: Mỗi DAG cần một file YAML riêng, gây khó khăn trong việc bảo trì.
Khả năng mở rộng kém: Khi số lượng DAG tăng, việc quản lý các file YAML trở nên phức tạp.
Dễ xảy ra lỗi: Chỉnh sửa thủ công dễ dẫn đến sai sót trong cấu hình.
Giải pháp là sử dụng một file template SparkApplication duy nhất, kết hợp với SparkKubernetesOperator để tùy chỉnh động các tham số cho từng DAG.
Giải pháp: Sử dụng Template SparkApplication với SparkKubernetesOperator
Chúng ta sẽ sử dụng một file template YAML cho SparkApplication và tích hợp nó với Airflow thông qua SparkKubernetesOperator. Dưới đây là các bước chi tiết:
Bước 1: Tạo file template SparkApplication
Tạo một file spark_application_template.yaml với các placeholder để thay thế các tham số động:
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: {{ spark_job_name }}
namespace: default
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "{{ spark_image }}"
imagePullPolicy: Always
mainApplicationFile: "{{ main_application_file }}"
sparkVersion: "3.2.0"
restartPolicy:
type: Never
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: "3.2.0"
serviceAccount: spark
executor:
cores: 1
instances: {{ executor_instances }}
memory: "512m"
labels:
version: "3.2.0"
arguments:
{{ spark_arguments | tojson }}
Các placeholder như {{ spark_job_name }}, {{ spark_image }}, v.v. sẽ được thay thế bằng các giá trị cụ thể trong DAG.
Bước 2: Tích hợp với Airflow
Sử dụng SparkKubernetesOperator để áp dụng file template YAML vào Kubernetes. Dưới đây là một ví dụ DAG:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.kubernetes_pod import KubernetesPodSensor
from datetime import datetime
from jinja2 import Template
# Hàm render template
def render_spark_application(**context):
with open('/opt/airflow/templates/spark_application_template.yaml', 'r') as file:
template = Template(file.read())
spark_params = {
'spark_job_name': f'spark-job-{context["dag_run"].run_id}',
'spark_image': 'spark:3.2.0',
'main_application_file': 's3://your-bucket/your-script.py',
'executor_instances': 2,
'spark_arguments': ['--input', 's3://your-bucket/input', '--output', 's3://your-bucket/output']
}
rendered_yaml = template.render(**spark_params)
yaml_file_path = f'/tmp/spark_application_{context["dag_run"].run_id}.yaml'
with open(yaml_file_path, 'w') as f:
f.write(rendered_yaml)
return yaml_file_path
# Định nghĩa DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2025, 5, 22),
'retries': 1,
}
with DAG(
'spark_kubernetes_dag',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
render_task = PythonOperator(
task_id='render_spark_application',
python_callable=render_spark_application,
provide_context=True,
)
spark_task = SparkKubernetesOperator(
task_id='run_spark_application',
namespace='default',
application_file='{{ ti.xcom_pull(task_ids="render_spark_application") }}',
kubernetes_conn_id='kubernetes_default',
do_xcom_push=True,
)
sensor_task = KubernetesPodSensor(
task_id='monitor_spark_application',
namespace='default',
pod_name='{{ ti.xcom_pull(task_ids="run_spark_application")["metadata"]["name"] }}',
kubernetes_conn_id='kubernetes_default',
timeout=600,
mode='reschedule',
)
render_task >> spark_task >> sensor_task
Bước 3: Mở rộng cho nhiều DAG
Để sử dụng template này cho nhiều Spark job, bạn có thể tạo các DAG động từ một danh sách tham số:
spark_jobs = [
{
'dag_id': 'spark_job_1',
'job_name': 'spark-job-1',
'main_application_file': 's3://bucket/job1.py',
'executor_instances': 2,
'arguments': ['--input', 's3://bucket/input1', '--output', 's3://bucket/output1']
},
{
'dag_id': 'spark_job_2',
'job_name': 'spark-job-2',
'main_application_file': 's3://bucket/job2.py',
'executor_instances': 3,
'arguments': ['--input', 's3://bucket/input2', '--output', 's3://bucket/output2']
}
]
for job in spark_jobs:
with DAG(job['dag_id'], default_args=default_args, schedule_interval=None) as dag:
render_task = PythonOperator(
task_id='render_spark_application',
python_callable=render_spark_application,
op_kwargs={'spark_params': {
'job_name': job['job_name'],
'spark_image': 'spark:3.2.0',
'main_application_file': job['main_application_file'],
'executor_instances': job['executor_instances'],
'spark_arguments': job['arguments']
}},
provide_context=True,
)
spark_task = SparkKubernetesOperator(
task_id='run_spark_application',
namespace='default',
application_file='{{ ti.xcom_pull(task_ids="render_spark_application") }}',
kubernetes_conn_id='kubernetes_default',
do_xcom_push=True,
)
sensor_task = KubernetesPodSensor(
task_id='monitor_spark_application',
namespace='default',
pod_name='{{ ti.xcom_pull(task_ids="run_spark_application")["metadata"]["name"] }}',
kubernetes_conn_id='kubernetes_default',
timeout=600,
mode='reschedule',
)
render_task >> spark_task >> sensor_task
Lợi ích của cách tiếp cận này
Tái sử dụng code: Một template YAML duy nhất được sử dụng cho tất cả các DAG, giảm thiểu lặp lại.
Dễ dàng mở rộng: Thêm Spark job mới chỉ cần cập nhật danh sách spark_jobs.
Tính linh hoạt: Các tham số như main_application_file, executor_instances, và spark_arguments có thể được tùy chỉnh cho từng job.
Tích hợp chặt chẽ: SparkKubernetesOperator và KubernetesPodSensor đảm bảo việc chạy và theo dõi job được thực hiện mượt mà trên Kubernetes.
Để định nghĩa cấu hình của pod trong DAG khi sử dụng SparkKubernetesOperator trong Apache Airflow, bạn có thể tùy chỉnh cấu hình pod thông qua các tham số của SparkApplication trong file template YAML hoặc trực tiếp trong DAG. SparkKubernetesOperator cho phép bạn truyền cấu hình pod thông qua file YAML hoặc các tham số bổ sung. Dưới đây là cách thực hiện chi tiết:
Giải pháp tổng quan
Sử dụng file template YAML: Tùy chỉnh cấu hình pod (như tài nguyên, nhãn, biến môi trường, v.v.) trong file template SparkApplication và render nó động trong DAG.
Tùy chỉnh pod thông qua SparkKubernetesOperator: Sử dụng các tham số của SparkKubernetesOperator để thêm cấu hình pod bổ sung nếu cần.
Tích hợp với Jinja template: Sử dụng Jinja để truyền các giá trị cấu hình pod động từ DAG vào template YAML.
Theo dõi và quản lý pod: Sử dụng KubernetesPodSensor để theo dõi trạng thái pod nếu cần.
Các bước chi tiết
1. Tạo file template SparkApplication với cấu hình pod
Tạo file spark_application_template.yaml với các placeholder cho cấu hình pod, bao gồm tài nguyên, nhãn, biến môi trường, v.v.:
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: {{ spark_job_name }}
namespace: {{ namespace }}
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "{{ spark_image }}"
imagePullPolicy: Always
mainApplicationFile: "{{ main_application_file }}"
sparkVersion: "3.2.0"
restartPolicy:
type: Never
driver:
cores: {{ driver_cores }}
coreLimit: "{{ driver_core_limit }}"
memory: "{{ driver_memory }}"
labels:
app: "{{ spark_job_name }}"
{{ driver_labels | tojson | replace('"', '') | replace('{', '') | replace('}', '') }}
serviceAccount: {{ service_account }}
env:
{{ driver_env_vars | tojson }}
annotations:
{{ driver_annotations | tojson | replace('"', '') | replace('{', '') | replace('}', '') }}
executor:
cores: {{ executor_cores }}
instances: {{ executor_instances }}
memory: "{{ executor_memory }}"
labels:
app: "{{ spark_job_name }}"
{{ executor_labels | tojson | replace('"', '') | replace('{', '') | replace('}', '') }}
env:
{{ executor_env_vars | tojson }}
annotations:
{{ executor_annotations | tojson | replace('"', '') | replace('{', '') | replace('}', '') }}
arguments:
{{ spark_arguments | tojson }}
Giải thích các cấu hình pod:
driver và executor: Bạn có thể tùy chỉnh tài nguyên (cores, coreLimit, memory), nhãn (labels), biến môi trường (env), và chú thích (annotations) cho cả driver và executor pods.
Placeholder: Các giá trị như {{ driver_cores }}, {{ driver_env_vars }}, v.v. sẽ được thay thế động trong DAG.
2. Tạo DAG với cấu hình pod
Sử dụng SparkKubernetesOperator và một hàm Python để render template YAML với các cấu hình pod cụ thể. Dưới đây là ví dụ DAG:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.kubernetes_pod import KubernetesPodSensor
from datetime import datetime
from jinja2 import Template
# Hàm render template SparkApplication
def render_spark_application(**context):
with open('/opt/airflow/templates/spark_application_template.yaml', 'r') as file:
template = Template(file.read())
# Cấu hình pod động
spark_params = {
'spark_job_name': f'spark-job-{context["dag_run"].run_id}',
'namespace': 'default',
'spark_image': 'spark:3.2.0',
'main_application_file': 's3://your-bucket/your-script.py',
'driver_cores': 1,
'driver_core_limit': '1200m',
'driver_memory': '512m',
'driver_labels': {'version': '3.2.0', 'env': 'prod'},
'driver_env_vars': [
{'name': 'ENV_VAR1', 'value': 'value1'},
{'name': 'ENV_VAR2', 'value': 'value2'}
],
'driver_annotations': {'prometheus.io/scrape': 'true'},
'executor_cores': 1,
'executor_instances': 2,
'executor_memory': '512m',
'executor_labels': {'version': '3.2.0', 'env': 'prod'},
'executor_env_vars': [
{'name': 'ENV_VAR1', 'value': 'value1'}
],
'executor_annotations': {'prometheus.io/scrape': 'true'},
'service_account': 'spark',
'spark_arguments': ['--input', 's3://your-bucket/input', '--output', 's3://your-bucket/output']
}
# Render template
rendered_yaml = template.render(**spark_params)
yaml_file_path = f'/tmp/spark_application_{context["dag_run"].run_id}.yaml'
with open(yaml_file_path, 'w') as f:
f.write(rendered_yaml)
return yaml_file_path
# Định nghĩa DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2025, 5, 22),
'retries': 1,
}
with DAG(
'spark_kubernetes_dag',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
# Task render template
render_task = PythonOperator(
task_id='render_spark_application',
python_callable=render_spark_application,
provide_context=True,
)
# Task chạy SparkApplication
spark_task = SparkKubernetesOperator(
task_id='run_spark_application',
namespace='default',
application_file='{{ ti.xcom_pull(task_ids="render_spark_application") }}',
kubernetes_conn_id='kubernetes_default',
do_xcom_push=True,
)
# Task theo dõi trạng thái
sensor_task = KubernetesPodSensor(
task_id='monitor_spark_application',
namespace='default',
pod_name='{{ ti.xcom_pull(task_ids="run_spark_application")["metadata"]["name"] }}',
kubernetes_conn_id='kubernetes_default',
timeout=600,
mode='reschedule',
)
render_task >> spark_task >> sensor_task
3. Giải thích code
Cấu hình pod trong spark_params:
driver_cores, driver_memory: Xác định tài nguyên CPU và bộ nhớ cho driver pod.
driver_labels, executor_labels: Thêm nhãn để quản lý hoặc giám sát pod (ví dụ: cho Prometheus).
driver_env_vars, executor_env_vars: Thêm các biến môi trường cho driver và executor pods.
driver_annotations, executor_annotations: Thêm chú thích để tích hợp với các công cụ bên ngoài (như monitoring).
service_account: Xác định service account mà Spark job sử dụng.
Render template: Hàm render_spark_application sử dụng Jinja2 để thay thế các placeholder trong template YAML bằng các giá trị từ spark_params.
SparkKubernetesOperator: Áp dụng file YAML đã render vào Kubernetes để tạo SparkApplication.
KubernetesPodSensor: Theo dõi trạng thái của pod driver để đảm bảo job hoàn thành.
4. Tùy chỉnh thêm cấu hình pod
Nếu bạn cần thêm các cấu hình pod phức tạp hơn (như volume mounts, node selectors, hoặc tolerations), bạn có thể mở rộng template YAML. Ví dụ:
driver:
cores: {{ driver_cores }}
coreLimit: "{{ driver_core_limit }}"
memory: "{{ driver_memory }}"
labels:
app: "{{ spark_job_name }}"
{{ driver_labels | tojson | replace('"', '') | replace('{', '') | replace('}', '') }}
serviceAccount: {{ service_account }}
env:
{{ driver_env_vars | tojson }}
annotations:
{{ driver_annotations | tojson | replace('"', '') | replace('{', '') | replace('}', '') }}
nodeSelector:
{{ driver_node_selector | tojson | replace('"', '') | replace('{', '') | replace('}', '') }}
tolerations:
{{ driver_tolerations | tojson }}
volumeMounts:
{{ driver_volume_mounts | tojson }}
volumes:
{{ volumes | tojson }}
Thêm các tham số tương ứng vào spark_params trong DAG:
spark_params = {
# ... các tham số khác ...
'driver_node_selector': {'disktype': 'ssd'},
'driver_tolerations': [
{'key': 'key1', 'operator': 'Equal', 'value': 'value1', 'effect': 'NoSchedule'}
],
'driver_volume_mounts': [
{'name': 'test-volume', 'mountPath': '/mnt/data'}
],
'volumes': [
{'name': 'test-volume', 'persistentVolumeClaim': {'claimName': 'my-pvc'}}
]
}
5. Tích hợp với nhiều DAG
Để sử dụng template này cho nhiều DAG, bạn có thể tạo các DAG động tương tự như ví dụ trước:
spark_jobs = [
{
'dag_id': 'spark_job_1',
'job_name': 'spark-job-1',
'main_application_file': 's3://bucket/job1.py',
'driver_cores': 1,
'driver_memory': '512m',
'executor_instances': 2,
'executor_memory': '512m',
'spark_arguments': ['--input', 's3://bucket/input1', '--output', 's3://bucket/output1']
},
{
'dag_id': 'spark_job_2',
'job_name': 'spark-job-2',
'main_application_file': 's3://bucket/job2.py',
'driver_cores': 2,
'driver_memory': '1g',
'executor_instances': 3,
'executor_memory': '1g',
'spark_arguments': ['--input', 's3://bucket/input2', '--output', 's3://bucket/output2']
}
]
for job in spark_jobs:
with DAG(job['dag_id'], default_args=default_args, schedule_interval=None) as dag:
render_task = PythonOperator(
task_id='render_spark_application',
python_callable=render_spark_application,
op_kwargs={'spark_params': {
'spark_job_name': f"{job['job_name']}-{{ dag_run.run_id }}",
'namespace': 'default',
'spark_image': 'spark:3.2.0',
'main_application_file': job['main_application_file'],
'driver_cores': job['driver_cores'],
'driver_core_limit': '1200m',
'driver_memory': job['driver_memory'],
'driver_labels': {'version': '3.2.0', 'env': 'prod'},
'driver_env_vars': [{'name': 'ENV_VAR1', 'value': 'value1'}],
'driver_annotations': {'prometheus.io/scrape': 'true'},
'executor_cores': 1,
'executor_instances': job['executor_instances'],
'executor_memory': job['executor_memory'],
'executor_labels': {'version': '3.2.0', 'env': 'prod'},
'executor_env_vars': [{'name': 'ENV_VAR1', 'value': 'value1'}],
'executor_annotations': {'prometheus.io/scrape': 'true'},
'service_account': 'spark',
'spark_arguments': job['spark_arguments']
}},
provide_context=True,
)
spark_task = SparkKubernetesOperator(
task_id='run_spark_application',
namespace='default',
application_file='{{ ti.xcom_pull(task_ids="render_spark_application") }}',
kubernetes_conn_id='kubernetes_default',
do_xcom_push=True,
)
sensor_task = KubernetesPodSensor(
task_id='monitor_spark_application',
namespace='default',
pod_name='{{ ti.xcom_pull(task_ids="run_spark_application")["metadata"]["name"] }}',
kubernetes_conn_id='kubernetes_default',
timeout=600,
mode='reschedule',
)
render_task >> spark_task >> sensor_task
6. Lưu ý
Cài đặt plugin: Đảm bảo cài đặt apache-airflow-providers-cncf-kubernetes (pip install apache-airflow-providers-cncf-kubernetes).
Kubernetes connection: Cấu hình kubernetes_default trong Airflow Connections.
Quyền truy cập: Service account spark cần có quyền tạo và quản lý SparkApplication và pod trong namespace.
Lưu trữ file tạm thời: File YAML tạm được lưu vào /tmp. Đảm bảo Airflow worker có quyền ghi vào thư mục này.
Tùy chỉnh thêm: Bạn có thể thêm các cấu hình như sparkConf, initContainers, hoặc sidecarContainers vào template YAML nếu cần.
Bằng cách định nghĩa cấu hình pod trong file template SparkApplication và render nó động trong DAG, bạn có thể tùy chỉnh tài nguyên, nhãn, biến môi trường, và các thuộc tính khác của pod một cách linh hoạt. Sử dụng SparkKubernetesOperator giúp đơn giản hóa việc triển khai, và cách tiếp cận này dễ dàng mở rộng cho nhiều DAG
Subscribe to my newsletter
Read articles from Kilo directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
