Run dbt with Cloud Composer and Cloud Run
Recently, a client wanted to use dbt core in Cloud Composer Airflow but they encountered Python dependencies issues. To solve this issue, I built a solution to run dbt inside of Cloud Run with Cloud Composer as orchestrator.
The workflow looks like:
Cloud Composer -> BashOperator to trigger a Cloud Run service -> Cloud Run container runs dbt cli
Below is how it works:
Cloud Run service:
When the Cloud Run service is triggered (via HTTP call), it will run a bash script (script.sh)
Dockerfile
FROM golang:1.18-buster as builder
WORKDIR /app
COPY go.* ./
RUN go mod download
COPY invoke.go ./
RUN go build -mod=readonly -v -o server
# Use the official dbt-bigquery image for running
# https://github.com/dbt-labs/dbt-bigquery/pkgs/container/dbt-bigquery
FROM ghcr.io/dbt-labs/dbt-bigquery:1.4.1
WORKDIR /
COPY --from=builder /app/server /app/server
COPY script.sh ./
ENTRYPOINT ["/app/server"]
Invoke.go (Source code from here)
package main
import (
"log"
"net/http"
"os"
"os/exec"
)
func main() {
http.HandleFunc("/", scriptHandler)
// Determine port for HTTP service.
port := os.Getenv("PORT")
if port == "" {
port = "8080"
log.Printf("Defaulting to port %s", port)
}
// Start HTTP server.
log.Printf("Listening on port %s", port)
if err := http.ListenAndServe(":"+port, nil); err != nil {
log.Fatal(err)
}
}
func scriptHandler(w http.ResponseWriter, r *http.Request) {
cmd := exec.CommandContext(r.Context(), "/bin/bash", "script.sh")
cmd.Stderr = os.Stderr
out, err := cmd.Output()
if err != nil {
w.WriteHeader(500)
}
w.Write(out)
}
script.sh (This is just an example of running dbt)
dbt --version
I followed this document to deploy the Cloud Run shell service.gcloud run deploy run-dbt --no-allow-unauthenticated --region=us-central1 --source=.
Terminal output:
Airflow DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
}
dag = DAG(
"trigger_cloud_run_dag",
description="trigger_cloud_run_dag",
schedule_interval="0 3 * * *",
start_date=datetime(2023, 4, 19),
catchup=False,
tags=["custom"],
)
trigger_cloud_run = BashOperator(
task_id="trigger_cloud_run",
bash_command='curl -H "Authorization: Bearer $(gcloud auth print-identity-token)" https://run-dbt-7ensisdq5q-uc.a.run.app',
do_xcom_push=True,
dag=dag
)
trigger_cloud_run
To make it secure, the Cloud Run service requires IAM authentication. Therefore I granted my Cloud Composer worker service account roles/run.invoker
role.
Airflow logs:
Cloud Run logs show the dbt cli output:
I've uploaded the code to a GitHub repo. Feel free to let me know if you need any help running it.
Subscribe to my newsletter
Read articles from Derrick Qin directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by