Airflow Scheduler – Bộ Não Điều Phối Hệ Thống Data Pipeline

KiloKilo
7 min read

Apache Airflow đã trở thành công cụ orchestration phổ biến nhất hiện nay để quản lý và tự động hóa data pipeline.
Trong hệ sinh thái Airflow, Scheduler chính là bộ não của hệ thống – quyết định khi nàotask nào sẽ được chạy.

Airflow Scheduler là một thành phần cốt lõi của Apache Airflow, chịu trách nhiệm lập kế hoạch và điều phối các DAGs (Directed Acyclic Graphs) – các quy trình xử lý dữ liệu được định nghĩa trong Airflow. Nó đảm bảo rằng các task (tác vụ) trong DAG được thực thi đúng thời điểm, đúng thứ tự, và tuân thủ các phụ thuộc được định nghĩa.

Nói một cách đơn giản, Airflow Scheduler giống như một người quản lý dự án thông minh, luôn theo dõi lịch trình, phân bổ tài nguyên, và đảm bảo mọi thứ diễn ra theo đúng kế hoạch.

1. Scheduler là gì?

Scheduler trong Airflow là tiến trình chịu trách nhiệm:

  • Đọc và phân tích DAGs (Directed Acyclic Graphs)

  • Xác định thời điểm chạy DAG dựa trên schedule_interval, start_date, end_date

  • Đánh giá điều kiện phụ thuộc (depends_on_past, trigger_rule, upstream/downstream tasks)

  • Gửi task đủ điều kiện vào hàng đợi Executor

  • Theo dõi và cập nhật trạng thái task trong Metadata Database

📌 Điểm quan trọng: Scheduler không trực tiếp chạy task. Nó chỉ ra quyết địnhđiều phối task sang Executor.

Airflow Scheduler hoạt động dựa trên một vòng lặp liên tục, thực hiện các công việc sau:

  1. Quét DAGs: Scheduler liên tục quét thư mục chứa các tệp DAG (thường là thư mục dags/ trong Airflow) để phát hiện các DAG mới hoặc các thay đổi trong DAG hiện tại.

  2. Lập lịch thực thi: Scheduler phân tích các tham số lịch trình (schedule_interval) của từng DAG, chẳng hạn như chạy hàng giờ, hàng ngày, hoặc theo lịch biểu phức tạp hơn (ví dụ: CRON expression). Nó xác định thời điểm nào một DAG nên được kích hoạt.

  3. Quản lý phụ thuộc: Scheduler kiểm tra các phụ thuộc giữa các task trong DAG (ví dụ: task B chỉ chạy sau khi task A hoàn thành) và đảm bảo rằng các task được thực thi theo đúng thứ tự.

  4. Phân bổ tài nguyên: Scheduler gửi các task đã sẵn sàng đến Executor (một thành phần khác của Airflow) để thực thi. Tùy thuộc vào cấu hình, Executor có thể là LocalExecutor, CeleryExecutor, hoặc KubernetesExecutor, mỗi loại phù hợp với các môi trường khác nhau.

  5. Giám sát và xử lý lỗi: Scheduler theo dõi trạng thái của các task (chạy, thành công, thất bại, v.v.) và thực hiện các hành động như thử lại (retry) nếu task thất bại hoặc gửi thông báo qua email/Slack khi cần.

Tại sao Airflow Scheduler quan trọng?

Airflow Scheduler đóng vai trò trung tâm trong việc đảm bảo các data pipeline hoạt động hiệu quả. Dưới đây là một số lý do chính:

  • Tự động hóa quy trình: Scheduler loại bỏ nhu cầu chạy thủ công các tác vụ, giúp tiết kiệm thời gian và giảm nguy cơ lỗi do con người.

  • Khả năng mở rộng: Với các Executor phù hợp, Scheduler có thể quản lý hàng nghìn task trên nhiều máy chủ, phù hợp với các hệ thống dữ liệu lớn.

  • Xử lý linh hoạt: Scheduler hỗ trợ các lịch biểu phức tạp và có thể xử lý các DAG với hàng trăm task có phụ thuộc phức tạp.

  • Khả năng khôi phục: Trong trường hợp hệ thống gặp sự cố, Scheduler có thể tiếp tục từ điểm dừng trước đó nhờ cơ chế lưu trữ trạng thái trong cơ sở dữ liệu (Metadata Database).

📌 Ví dụ minh họa

  1. Bạn có một DAG chạy mỗi giờ (schedule_interval="0 * * * *").

  2. Scheduler quét DAG folder → thấy đã đến giờ → tạo DAG Run mới.

  3. Kiểm tra điều kiện task → gửi task đủ điều kiện sang Executor.

  4. Executor chạy task → trả kết quả về Metadata DB.

  5. Scheduler update trạng thái → quyết định chạy task tiếp theo.

Kiến trúc Airflow trên Kubernetes (dùng PgBouncer)

                 ┌─────────────────────────┐
                        DAG Folder         
                  (PVC / Git Sync Sidecar) 
                 └───────────┬─────────────┘
                             
                             
 ┌──────────────────────────────────────────────────────┐
                     Scheduler Pod                      
 │-------------------------------------------------------│
   - Quét DAGs  quyết định khi nào chạy               
   - Gửi task sang Executor (K8s / Celery / Local)      
   - Ghi trạng thái DAG/Task vào Metadata DB            
   - Đọc config từ airflow.cfg / env                    
 └───────────┬──────────────────────┬───────────────────┘
                                   
   Heartbeat                        SQL Queries
                                   
       ┌────────────┐         ┌──────────────┐
         Webserver            PgBouncer   
          (UI)               (Connection  
                               Pooler)    
       └─────┬──────┘         └──────┬───────┘
                                    
              SQL Queries           
                                    
       ┌─────────────────────────────────┐
              PostgreSQL Metadata DB    
          (Lưu DAG run, task status)    
       └─────────────────────────────────┘


    Trigger Tasks         Task Status Updates
                                    
                                    
 ┌──────────────────────────────────────────────────────┐
                   Kubernetes Executor                  
 │-------------------------------------------------------│
   - Nhận lệnh từ Scheduler để tạo Pod cho từng Task    
   - Mỗi Task chạy trong 1 Pod riêng biệt                
   - Kết quả gửi về Metadata DB qua PgBouncer           
 └──────────────────────────────────────────────────────┘

2. Cách Scheduler hoạt động

Quy trình làm việc của Scheduler diễn ra liên tục theo chu kỳ:

  1. Quét DAGs

    • Đọc DAG từ thư mục dags_folder hoặc từ DB (nếu bật store_serialized_dags).

    • Parse code Python để hiểu cấu trúc DAG và các task bên trong.

  2. Đánh giá thời điểm chạy DAG

    • So sánh thời gian hiện tại với lịch schedule_interval của DAG.

    • Nếu đủ điều kiện → tạo DAG Run mới.

  3. Xác định task đủ điều kiện chạy

    • Kiểm tra dependency và trigger rule.

    • Task nào sẵn sàng → đưa vào hàng đợi Executor.

  4. Gửi task cho Executor

    • Executor có thể là:

      • KubernetesExecutor: tạo Pod riêng cho từng task.

      • CeleryExecutor: gửi task cho Celery worker.

      • LocalExecutor: chạy task ngay trên node của Scheduler.

  5. Theo dõi tiến trình task

    • Scheduler cập nhật trạng thái (running, success, failed) vào Metadata DB.

    • Nếu task cần retry → gửi lại vào Executor.

3. Scheduler, Executor và Metadata DB

Một hệ thống Airflow tiêu chuẩn sẽ gồm:

  • Scheduler – Bộ não ra quyết định

  • Executor – Cánh tay thực thi (tạo Pod/worker để chạy task)

  • Metadata DB – Bộ nhớ lưu trữ trạng thái DAG/Task

Sơ đồ luồng dữ liệu:

Scheduler  quyết định chạy Task  gửi Executor  chạy Task  ghi kết quả vào Metadata DB

Khi chạy trên Kubernetes, mô hình thường như sau:

[Scheduler]  [KubernetesExecutor]  [Pod chạy task]  [Metadata DB (PostgreSQL)]

4. Các thông số cấu hình quan trọng

Khi Scheduler xử lý nhiều DAG và task, bạn cần tinh chỉnh các thông số sau (trong airflow.cfg hoặc Helm chart):

Tham sốGợi ý khi tải lớnÝ nghĩa
parallelism256–512Số task tối đa chạy đồng thời trên toàn hệ thống
dag_concurrency32–64Số task tối đa chạy trong một DAG
max_active_runs_per_dag4–8Số DAG run active tối đa
max_threads8–16Số luồng xử lý scheduler
parsing_processes2–4Số process parse DAG song song
scheduler_heartbeat_sec5Chu kỳ heartbeat
store_serialized_dagsTrueLưu DAG đã parse vào DB để giảm tải

5. Kinh nghiệm tối ưu Scheduler trên Kubernetes

  1. Tăng tài nguyên Pod Scheduler

    • CPU: 2–4 cores

    • RAM: 4–8 GB

  2. Chạy nhiều Scheduler replicas (Airflow 2.0+)

    • Giúp chia tải DAG parsing và trigger nhanh hơn.

    • Bật AIRFLOW__SCHEDULER__USE_ROW_LEVEL_LOCKING=True để tránh conflict DB.

  3. Dùng PgBouncer cho kết nối DB

    • Giảm áp lực kết nối từ nhiều Scheduler/Worker/Webserver.
  4. Bật Serialized DAG

    • Giảm CPU cho việc parse DAG.
  5. Theo dõi log và metrics của Scheduler

    • scheduler_heartbeat

    • dag_processing.import_errors

    • task_instance_created

Một số mẹo tối ưu hóa Airflow Scheduler

Để đảm bảo Scheduler hoạt động hiệu quả, bạn có thể áp dụng các mẹo sau:

  1. Tối ưu hóa cấu hình DAG: Đảm bảo rằng các DAG được viết tối ưu, tránh lặp lại công việc không cần thiết và sử dụng các tham số hợp lý cho schedule_interval.

  2. Sử dụng Executor phù hợp: Nếu bạn có khối lượng công việc lớn, hãy cân nhắc sử dụng CeleryExecutor hoặc KubernetesExecutor để phân phối tải.

  3. Tăng cường tài nguyên: Đảm bảo máy chủ chạy Scheduler có đủ CPU và bộ nhớ, đặc biệt khi quản lý nhiều DAG hoặc task.

  4. Giám sát và bảo trì: Sử dụng các công cụ như Airflow Webserver hoặc các giải pháp giám sát bên ngoài để theo dõi hiệu suất của Scheduler và phát hiện các vấn đề tiềm ẩn.

  5. Cập nhật cơ sở dữ liệu: Metadata Database (thường là PostgreSQL hoặc MySQL) cần được tối ưu hóa để tránh tình trạng tắc nghẽn khi Scheduler truy vấn thường xuyên.

0
Subscribe to my newsletter

Read articles from Kilo directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Kilo
Kilo