Building a GenAI Book Recommendation Pipeline with Airflow

Table of contents
- Project Setup (from GitHub)
- Environment Configuration
- Dockerized Runtime
- Embedding Model
- What the Pipeline Does
- Part 1: Building the Ingestion + Embedding DAG (fetch_data.py)
- Custom Failure Callback
- DAG Definition: fetch_data
- From this point on, every function inside this block — decorated with @task — becomes an Airflow task that runs as part of this DAG.
- Task 2: List Input Files
- Task 3: Transform Book Description Files
- What It Does
- Step-by-Step Breakdown
- Output
- Task 4: Create Vector Embeddings
- What It Does
- Step-by-Step Breakdown
- Task 5: Load Embeddings to Weaviate Vector DB
- Step-by-Step Breakdown
- Part 2: Querying Books Using Semantic Search (query_data.py)
As Large Language Models (LLMs) evolve, building smart AI systems isn’t just about prompt engineering anymore — it's about connecting data pipelines, orchestrating multi-stage tasks, and deploying reliable, scalable applications. This is where GenAI meets MLOps.
In this project, I built a production-style pipeline that powers a semantic book recommendation system using:
Apache Airflow 3.0 for orchestration
Weaviate for vector-based search
fastembed to generate sentence embeddings using
BAAI/bge-small-en-v1.5
Python scripts executed via Visual Studio Code
The system works on a Retrieval-Augmented Generation (RAG) pattern:
It ingests book descriptions from
.txt
filesEmbeds them into high-dimensional vectors
Stores them in Weaviate (a vector DB)
And retrieves top matches for natural language queries like:
"I'm looking for a philosophical book on the nature of reality."
This blog documents the full pipeline — from ingest to query — built in a modular, testable way. Whether you're exploring GenAI, MLOps, or just want to get hands-on with vector databases and Airflow DAGs, this walkthrough will show you how to connect the dots.
Project Setup (from GitHub)
This repository is structured to be modular, CI/CD-ready, and orchestrated via Astro CLI:
bashCopyEdit.
├── dags/ # Contains fetch_data.py and query_data.py DAGs
├── include/data/ # Book description text files (inputs)
├── src/img/ # Images (optional for DAG visualization)
├── tests/dags/ # Pytest-ready DAG tests
├── Dockerfile # Builds the Airflow runtime
├── docker-compose.override.yml
├── .env_example # Template for connection env vars
├── requirements.txt # Python dependencies
└── README.md
Environment Configuration
To connect Airflow to your local Weaviate instance, the .env
file contains:
AIRFLOW_CONN_MY_WEAVIATE_CONN='{
"conn_type": "weaviate",
"host": "weaviate",
"port": "8081",
"extra": {
"token": "adminkey",
"additional_headers": {
"X-Openai-Api-Key": "<YOUR OPENAI API KEY>"
},
"grpc_port": "50051",
"grpc_host": "weaviate",
"grpc_secure": "False",
"http_secure": "False"
}
}'
This config tells Airflow how to:
Reach Weaviate (hosted via Docker)
Authenticate using a token
Use plain HTTP and gRPC locally
Support semantic search and LLM-enhanced queries (if enabled)
Dockerized Runtime
You're not using weaviate.connect_to_embedded()
(as in Jupyter notebooks). Instead, the Weaviate DB runs via Docker. It’s accessible over http://weaviate:8081
and managed by Airflow via the above .env
connection.
To bring up your full system:
astro dev start
Embedding Model
The model used for embedding book descriptions is:
EMBEDDING_MODEL_NAME = "BAAI/bge-small-en-v1.5"
This model is loaded using fastembed
and converts text descriptions into high-dimensional vectors for semantic search.
What the Pipeline Does
fetch_data.py
: Data Ingestion & Vectorization
This DAG does the following:
Connects to Weaviate and checks/creates the
"Books"
collectionReads book
.txt
files ininclude/data
Parses metadata (title, author, description)
Embeds descriptions using
fastembed
Inserts the data + vector embeddings into Weaviate
Each block in this DAG is defined using Airflow’s @task
decorator and is retryable and chainable.
query_data.py
: Natural Language Query Search
This DAG does:
Waits for the vector asset
my_book_vector_data
Accepts a
query_str
parameter (e.g.,"A philosophical book"
)Embeds that query
Queries Weaviate for the most similar book vector
Logs the title, author, and description of the top match
Part 1: Building the Ingestion + Embedding DAG (fetch_data.py)
This part of the project handles everything from reading raw .txt
files to embedding and storing them in a vector database (Weaviate). The orchestration is fully handled using Apache Airflow 3.0 SDK.
Let’s break down the entire fetch_data.py
script line by line.
Imports and Constants
from airflow.sdk import chain, dag, task, Asset
from pendulum import datetime, duration
Explanation:
from airflow.sdk
: This imports Airflow’s modern Python SDK, introduced in Airflow 3.x. It allows defining DAGs and tasks using decorators rather than writing boilerplate code.dag
: A decorator to define a DAG (Directed Acyclic Graph), which is a set of tasks and dependencies.task
: A decorator to define individual operations (Python functions) that run as isolated Airflow tasks.chain
: Helps explicitly set the order of task execution (e.g., A → B).Asset
: Declares outputs from a task — these can be tracked and used as triggers in downstream DAGs.
from pendulum
: A timezone-aware datetime library used internally by Airflow to avoid issues with naive datetimes.datetime
: Used to set start times for the DAG.duration
: Used to define retry intervals, etc.
Project-Level Constants
COLLECTION_NAME = "Books"
BOOK_DESCRIPTION_FOLDER = "include/data"
EMBEDDING_MODEL_NAME = "BAAI/bge-small-en-v1.5"
Explanation:
These constants configure the behavior of the pipeline.
COLLECTION_NAME
: The name of the Weaviate collection (like a table) where vectorized book data will be stored.BOOK_DESCRIPTION_FOLDER
: Directory path (relative) where.txt
files containing raw book metadata are kept.EMBEDDING_MODEL_NAME
: Refers to a HuggingFace-compatible embedding model used by thefastembed
library. This model converts text into semantic vectors.
Custom Failure Callback
def _my_callback_func(context):
task_instance = context["task_instance"]
dag_run = context["dag_run"]
print(
f"CALLBACK: Task {task_instance.task_id} "
f"failed in DAG {dag_run.dag_id} at {dag_run.start_date}"
)
This is a failure handler. If any task fails, this function will be executed.
context
: A dictionary with details about the current DAG run and task instance.It logs:
The task ID that failed
The DAG in which it failed
The time when the failure occurred
This is useful for debugging failures in production and is a good Airflow best practice.
DAG Definition: fetch_data
@dag(
start_date=datetime(2025, 6, 1),
schedule="@hourly",
default_args={
"retries": 1,
"retry_delay": duration(seconds=10),
"on_failure_callback": _my_callback_func,
},
on_failure_callback=_my_callback_func,
)
def fetch_data():
This declares the fetch_data
DAG and schedules it to run once every hour (@hourly
). The DAG has:
start_date
: Sets the starting point in time after which the DAG becomes active.retries
: Configures retry attempts if a task fails.retry_delay
: How long to wait before retrying a failed task.on_failure_callback
: Points to the custom failure handler we just defined.
From this point on, every function inside this block — decorated with @task
— becomes an Airflow task that runs as part of this DAG.
Task 1: Create Collection if Not Exists
@task(retries=5, retry_delay=duration(seconds=2))
def create_collection_if_not_exists() -> None:
from airflow.providers.weaviate.hooks.weaviate import WeaviateHook
hook = WeaviateHook("my_weaviate_conn")
client = hook.get_conn()
existing_collections = client.collections.list_all()
existing_collection_names = existing_collections.keys()
if COLLECTION_NAME not in existing_collection_names:
print(f"Collection {COLLECTION_NAME} does not exist yet. Creating it...")
collection = client.collections.create(name=COLLECTION_NAME)
print(f"Collection {COLLECTION_NAME} created successfully.")
print(f"Collection details: {collection}")
This task is responsible for setting up the Weaviate collection where data will be stored.
WeaviateHook
: Provided by Airflow’s Weaviate provider; abstracts away connection management.hook.get_conn()
: Returns an active Weaviate Python client, preconfigured using the.env
file connection.collections.list_all()
: Lists existing collections in the vector DB.If the
"Books"
collection does not exist, it is created using:client.collections.create(name=COLLECTION_NAME)
Retry logic is built-in — this task retries up to 5 times if it fails.
Task 2: List Input Files
@task
def list_book_description_files() -> list:
import os
book_description_files = [
f for f in os.listdir(BOOK_DESCRIPTION_FOLDER) if f.endswith(".txt")
]
return book_description_files
This task scans the include/data/
directory and returns a list of .txt
files. These files contain the raw book descriptions in custom format.
It uses:
os.listdir()
to get all files in the folderFilters only
.txt
files usingstr.endswith()
This list is passed downstream to transform tasks using dynamic task mapping.
Task 3: Transform Book Description Files
@task
def transform_book_description_files(book_description_file: str) -> str:
import json
import os
with open(
os.path.join(BOOK_DESCRIPTION_FOLDER, book_description_file), "r"
) as f:
book_descriptions = f.readlines()
titles = [
book_description.split(":::")[1].strip()
for book_description in book_descriptions
]
authors = [
book_description.split(":::")[2].strip()
for book_description in book_descriptions
]
book_description_text = [
book_description.split(":::")[3].strip()
for book_description in book_descriptions
]
book_descriptions = [
{
"title": title,
"author": author,
"description": description,
}
for title, author, description in zip(
titles, authors, book_description_text
)
]
return book_descriptions
What It Does
This task reads and parses each book description .txt
file into structured Python dictionaries. It processes each line in the file, which is formatted like:
0 ::: Book Title ::: Author ::: Description of the book
Step-by-Step Breakdown
os.path.join(...)
: Constructs a safe file path to the input file.readlines()
: Reads all lines in the file into a list (each line = one book).Each line is split using
.split(":::")
, extracting:index
: not usedtitle
author
description
The three fields are cleaned with
.strip()
to remove whitespaces.zip(...)
: Combines the three separate lists into dictionaries like:
{
"title": "Book Title",
"author": "Author Name",
"description": "This is a detailed description..."
}
Output
A list of Python dictionaries — each representing a book with its metadata. This gets passed to the embedding step.
Task 4: Create Vector Embeddings
@task
def create_vector_embeddings(book_data: list) -> list:
from fastembed import TextEmbedding
embedding_model = TextEmbedding(EMBEDDING_MODEL_NAME)
book_descriptions = [book["description"] for book in book_data]
description_embeddings = [
list(map(float, next(embedding_model.embed([desc]))))
for desc in book_descriptions
]
return description_embeddings
What It Does
This task converts each book's description into a semantic vector using the fastembed
library and the model you configured earlier.
Step-by-Step Breakdown
TextEmbedding(...)
: Initializes the HuggingFace embedding model"BAAI/bge-small-en-v1.5"
viafastembed
.[book["description"] for book in book_data]
: Extracts all descriptions from the input list of dictionaries.The core line:
list(map(float, next(embedding_model.embed([desc]))))
does the following:
embedding_model.embed([desc])
: Generates the embedding for the description (returns a generator).next(...)
: Extracts the first (and only) result from the generator.map(float, ...)
: Converts each vector component to float (ensuring compatibility).list(...)
: Converts the final result into a list of floats.
Each embedding is a dense vector, typically of dimension 384 for this model.
Task 5: Load Embeddings to Weaviate Vector DB
@task(outlets=[Asset("my_book_vector_data")], trigger_rule="all_done")
def load_embeddings_to_vector_db(
list_of_book_data: list, list_of_description_embeddings: list
) -> None:
from airflow.providers.weaviate.hooks.weaviate import WeaviateHook
from weaviate.classes.data import DataObject
hook = WeaviateHook("my_weaviate_conn")
client = hook.get_conn()
collection = client.collections.get(COLLECTION_NAME)
for book_data_list, emb_list in zip(
list_of_book_data, list_of_description_embeddings
):
items = []
for book_data, emb in zip(book_data_list, emb_list):
item = DataObject(
properties={
"title": book_data["title"],
"author": book_data["author"],
"description": book_data["description"],
},
vector=emb,
)
items.append(item)
collection.data.insert_many(items)
Step-by-Step Breakdown
WeaviateHook(...)
: Connects to your Docker-hosted Weaviate (via.env
)client.collections.get(...)
: Gets the active collection to insert data into.zip(...)
: Pairs the list of book data with corresponding list of embeddings.DataObject(...)
: Each book+vector becomes aDataObject
for Weaviate:properties
: structured metadatavector
: the actual embedding
Then:
collection.data.insert_many(items)
pushes the entire list of items into the vector DB at once.
This task also declares:
@task(outlets=[Asset("my_book_vector_data")])
So it emits an asset. This will be used to trigger the next DAG: query_data.py
.
Task Chain: Enforce Ordering
chain(_create_collection_if_not_exists, _load_embeddings_to_vector_db)
Ensures that the Weaviate collection creation completes before any embeddings are inserted.
Other tasks (like transform and embed) are mapped and executed in parallel, so you don’t need to manually chain them.
Final Step: Call the DAG Function
fetch_data()
This executes the DAG definition so that Airflow can parse and register it in the DAGs list.
Part 2: Querying Books Using Semantic Search (query_
data.py
)
Code Walkthrough — Line by Line
from airflow.sdk import dag, task, Asset
Imports the new Python SDK tools for defining DAGs and tasks.
COLLECTION_NAME = "Books"
EMBEDDING_MODEL_NAME = "BAAI/bge-small-en-v1.5"
These match the values used in the fetch_data.py
DAG.
You must use the same embedding model to generate consistent vectors for both storage and querying. COLLECTION_NAME
points to the existing collection of book objects.
DAG Definition: query_data
@dag(
schedule=[Asset("my_book_vector_data")],
params={"query_str": "A philosophical book"},
)
def query_data():
schedule=[Asset("my_book_vector_data")]
: This DAG runs only when the asset is updated — i.e., when the book data is loaded byfetch_data.py
. This is called event-triggered DAGs (no need for time-based schedule).params={"query_str": "A philosophical book"}
: Sets a default parameter. This allows the DAG to be parameterized — users can override it in the Airflow UI.You can trigger this DAG manually and change the query string like:
"Show me a book on quantum physics"
Task: Search Vector DB for a Book
@task def search_vector_db_for_a_book(**context):
This task:
Reads the query string from
params
Embeds the query
Performs a nearest neighbor search in Weaviate
Prints the most relevant book result
Inside the Task — Line by Line
from airflow.providers.weaviate.hooks.weaviate import WeaviateHook
from fastembed import TextEmbedding
query_str = context["params"]["query_str"]
WeaviateHook
: To connect to your running Weaviate instance.TextEmbedding
: Loads the same embedding model (bge-small-en-v1.5
).Grabs the dynamic input string. This is the search phrase from the DAG input parameters.
hook = WeaviateHook("my_weaviate_conn")
client = hook.get_conn()
Instantiates a Weaviate client using Airflow’s connection settings from .env
.
embedding_model = TextEmbedding(EMBEDDING_MODEL_NAME)
collection = client.collections.get(COLLECTION_NAME)
Loads the same embedding model you used to vectorize book descriptions.
Retrieves the
"Books"
collection from the vector DB.
query_emb = list(embedding_model.embed([query_str]))[0]
Converts the input string into a semantic vector (same format/dimension as the book vectors).
Weaviate uses this vector for nearest neighbor search.
results = collection.query.near_vector(
near_vector=query_emb,
limit=1,
)
Executes the search.
near_vector
: tells Weaviate to search for the most similar vector.limit=1
: returns only the top result.
Print the Result
for result in results.objects:
print(
f"You should read: {result.properties['title']} by {result.properties['author']}"
)
print("Description:")
print(result.properties["description"])
Iterates through results (even though there’s only 1).
Prints:
Book title
Author
Full description
Call the DAG Function
query_data()
Want to Try It Yourself?
You can find the full project with setup and execution guide in the GitHub repo: Click Here
Subscribe to my newsletter
Read articles from Nikitha Pyata directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
