Building a RAG System with Async FastAPI, Qdrant, Langchain and OpenAI

Table of contents

Introduction
In the era of advanced AI applications, Retrieval-Augmented Generation (RAG) stands out as a game-changing approach. By combining retrieval techniques with generative models, RAG enhances the quality, accuracy, and relevance of generated outputs. This blog walks you through building a scalable and efficient RAG system using FastAPI, Qdrant, LangChain, and OpenAI, all while leveraging asynchronous capabilities for improved performance.
At FutureSmart AI, we are committed to pioneering innovative solutions and leveraging cutting-edge technologies. Building a RAG system with Async FastAPI, Qdrant, Langchain, and OpenAI has helped us create efficient and Highly scalable AI-powered applications for our clients. This blog primarily reflects on our dedication to empowering developers and organizations with actionable knowledge to implement high-performance systems.
While this blog focuses on an on-premise setup for a hands-on approach, drawing from our experience we can assure that these tools also support scalable cloud-based deployments, ensuring flexibility for production-ready solutions. At FutureSmart AI, we’re always exploring and refining methods to push the boundaries of what AI can achieve.
Overview of Retrieval-Augmented Generation (RAG)
RAG combines two essential components:
Retrieval: Find relevant documents from a large dataset. This part uses a search mechanism to identify the most relevant passages from a large text based on the input query.
Generation: Uses a language model to generate context-aware answers. Once relevant information is retrieved, a language model generates the final response by incorporating the retrieved context into the generated text.
This integration empowers Retrieval-Augmented Generation (RAG) to deliver more accurate and contextually relevant responses compared to standalone Large Language Models (LLMs).
For a comprehensive understanding, explore our Langchain RAG Course: From Basics to Production-Ready RAG Chatbot or, if you prefer reading, visit our detailed Blog for more insights.
The Tech Stack: What You Need & Why
Let's break down our tools and why we chose them. Each one plays a crucial role in building a powerful RAG system.
FastAPI
FastAPI enables the rapid development of performant web APIs with asynchronous capabilities. Its support for Python-type hints makes it developer-friendly and robust.
For more information, check out the FastAPI Tutorial.
Qdrant excels in high-dimensional vector storage and retrieval operations. In our enterprise implementations, it has proven invaluable for:
Efficient management of large-scale vector datasets
Optimal performance in similarity search operations
Seamless horizontal scaling capabilities
For a detailed and in-depth explanation please refer to our Comprehensive Guide to Installing and Using Qdrant VectorDB with Docker Server and Local Setup
LangChain
LangChain and its components, such as chains, prompts, and memory, enable efficient interaction with LLMs.
OpenAI
We will use OpenAI’s language models in this tutorial. You'll also need a basic understanding of how to send queries to OpenAI’s API and interpret responses.
Prerequisites
- Create a Python Virtual Environment
It’s recommended to use a virtual environment to isolate your dependencies.
- Install Dependencies
Use the provided requirements.txt
file to install the necessary Python packages.
pip install -r requirements.txt
- Setting up API Keys
To connect to external services like OpenAI and Qdrant, you need to set up API keys securely.
- Create a
.env
File
Create a .env
file in the root of your project directory to store sensitive information like API keys and configuration details.
Example .env
file:
OPENAI_API_KEY=your_openai_api_key
qdrant_db_path=http://localhost:6333 # Replace with your Qdrant URL
llm_provider="openai"
model="gpt-4o-mini"
temperature="0.1"
chunk_size = 2000
no_of_chunks = 3
- Load Environment Variables
Use libraries like python-dotenv
to load the .env
file into your application.
from dotenv import load_dotenv
import os
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
QDRANT_URL = os.getenv("QDRANT_URL")
Project Structure
services/
logger.py
pydantic_models.py
uploads/
xxx.txt
yyy.pdf
zzz.docx
utils/
__init__.py
db_utils.py
langchain_utils.py
prompts.py
qdrant_utils.py
utils.py
.env
api.py
services/:
This folder houses essential services that support core functionalities:logger.py:
Manages the logging setup for the application. Logging is critical for debugging, monitoring, and tracking the application's behavior.pydantic_models.py
: Defines Pydantic models used for data validation and serialization. Pydantic ensures data entering the application is valid and formatted correctly.
uploads/:
A dedicated folder for file uploads. This is where the application stores temporary or permanent files uploaded by users.utils/:
A utility module containing helper scripts that encapsulate reusable logic:__init__.py
: Marks the folder as a Python package.db_utils.py
: Contains functions for interacting with the database.langchain_utils.py
: Provides utility functions for integrating LangChain, a framework for language model applications.prompts.py
: Stores pre-defined prompts for interacting with language models or other AI systems.qdrant_utils.py
: Handles operations with Qdrant, a vector search engine for similarity-based search.utils.py
: General-purpose utility functions used across the project.
.env:
A configuration file storing environment variables like database credentials, API keys, and other sensitive data.api.py:
The application's entry point is where FastAPI initializes and routes are defined. This file connects all the components and defines the API endpoints.
Setting Up Qdrant for Efficient Retrieval
Imports and Configuration
import os
import time
from dotenv import load_dotenv
from uuid import uuid4
import asyncio
# Langchain imports
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_qdrant import QdrantVectorStore
from langchain_openai import OpenAIEmbeddings
# Qdrant imports
from qdrant_client import QdrantClient, AsyncQdrantClient
from qdrant_client.http.models import Distance, VectorParams
from services.logger import logger
from uuid import uuid4
load_dotenv(override=True)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
qdrant_db_path=os.getenv("qdrant_db_path")
The setup begins with importing essential libraries, loading environment variables (like API keys), and initializing necessary configurations. Notable imports include LangChain's RecursiveCharacterTextSplitter
for chunking documents and Qdrant’s async client for vector database interactions.
DocumentIndexer Class
The DocumentIndexer
class handles indexing and retrieval in Qdrant. Let’s break it down step-by-step.
Initialization
class DocumentIndexer:
def __init__(self, qdrant_db_path):
self.db_path = qdrant_db_path
self.embedding_function = OpenAIEmbeddings(model="text-embedding-3-large", api_key=OPENAI_API_KEY)
self.vector_store = None
self.client = AsyncQdrantClient(self.db_path)
embedding_function
: Uses OpenAI’s embeddings to convert text into dense vector representations.client
: Initializes an async Qdrant client to manage the vector database.vector_store
: Qdrant vector store is used to add documents and manage their vector representations.
Indexing Text in Qdrant
The method index_in_qdrantdb
handles the extraction and indexing of document text. Here’s how it works:
async def index_in_qdrantdb(self, extracted_text, file_name, doc_type, chunk_size):
try:
# Create a Document object
doc = Document(
page_content=extracted_text,
metadata={
"file_name": file_name,
"doc_type": doc_type
}
)
chunk_size = int(os.getenv("chunk_size"))
logger.info(f"Using dynamic chunk size: {chunk_size}")
# Split the document
text_splitter = RecursiveCharacterTextSplitter(
separators=['\\n\\n', '\\n', ','],
chunk_size=chunk_size,
chunk_overlap=200
)
docus = text_splitter.split_documents([doc])
# Generate UUIDs for all chunks
uuids = [f"{str(uuid4())}" for _ in range(len(docus))]
collection = "rag_demo_collection"
collections = await self.client.get_collections()
if collection in [collection_name.name for collection_name in collections.collections]:
logger.info(f"Collection {collection} already exists in QdrantDB")
else:
await self.client.create_collection(
collection_name=collection,
vectors_config=VectorParams(size=3072, distance=Distance.COSINE))
self.vector_store = QdrantVectorStore.from_existing_collection(collection_name=collection, embedding=self.embedding_function, url=self.db_path)
await self.vector_store.aadd_documents(documents=docus, ids=uuids)
logger.info(f"Successfully indexed document in QdrantDB")
return True
except Exception as e:
logger.error(f"Error indexing document in QdrantDB: {e}")
raise
Key Points:
Document Creation: Loading and splitting extracted data asynchronously for efficient processing. A
Document
object is created to store extracted text and metadata.Chunking of Document: The document is divided into manageable chunks using
RecursiveCharacterTextSplitter
.Collection Management: The Qdrant collection is created only if it doesn’t already exist.
Batch Indexing: Chunks are added to the Qdrant database with unique UUIDs.
Using openai embedding model to create vector representations of documents.
Asynchronously uploading documents to Qdrant for similarity search.
Retrieving Documents
To enable querying of indexed data, the get_retriever
method returns a retriever:
async def get_retriever(self, top_k):
try:
collection = "rag_demo_collection"
if self.vector_store is None:
self.vector_store = QdrantVectorStore.from_existing_collection(collection_name=collection, embedding=self.embedding_function, url=self.db_path)
return self.vector_store.as_retriever(search_type="similarity", search_kwargs={"k": top_k})
except Exception as e:
logger.error(f"Error creating retriever: {e}")
raise
Key Points:
Retriever Initialization: If the
vector_store
object doesn’t exist, it initializes a retriever from the existing collection.Search Parameters: Supports similarity-based searches with a configurable
top_k
parameter.
Implementing the Asynchronous FastAPI Endpoint
Asynchronous endpoints allow the server to handle multiple requests simultaneously, which is essential for applications that process large files or perform complex computations.
Setting Up FastAPI
The first step is initializing a FastAPI application that supports asynchronous request handling. This allows the server to process multiple incoming requests without blocking other operations, essential for high-performance APIs.
app = FastAPI()
The application is initialized with the FastAPI()
class, which serves as the primary entry point for defining routes and handling requests.
Defining API Routes
The code leverages async def
for efficient non-blocking request handling, ensuring high performance under concurrent loads. Two main routes are implemented: the /upload-knowledge
endpoint and the /chat
endpoint. These routes demonstrate seamless integration of file processing, database operations, and conversational AI.
Document Ingestion Endpoint (
/upload-knowledge
)Allows users to upload files containing knowledge documents.
Extracts text from the uploaded file and indexes it in a database for future query responses
@app.post("/upload-knowledge")
async def upload_knowledge(
username: str = Form(...),
file: Optional[UploadFile] = File(None)
):
try:
# Handle file extraction and indexing
extracted_text = ""
if file:
logger.info(f"File uploaded: {file.filename}")
file_content = await file.read()
file_extension = file.filename.split('.')[-1].lower()
extracted_text = await extract_text_from_file(file_content, file_extension)
logger.info(f"Extracted text from file: {extracted_text}")
await index_documents(username, extracted_text, file.filename, file_extension)
return {'response': 'Indexed Documents Successfully', 'extracted_text': extracted_text}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error processing indexing request: {str(e)}")
raise HTTPException(status_code=500, detail=f"Unexpected error: {e}")
File Text Extraction
The function extract_text_from_file
extracts text from different file types (e.g., TXT, PDF, DOCX).
# Asynchronous file text extraction
async def extract_text_from_file(file_content: bytes, file_type: str) -> str:
"""
Extract text from different file types based on the file type.
"""
if file_type == "txt":
return await extract_text_from_txt(file_content)
elif file_type == "pdf":
return await extract_text_from_pdf(file_content)
elif file_type == "docx":
return await extract_text_from_docx(file_content)
else:
raise HTTPException(status_code=400, detail="Unsupported file type")
PDF Text Extraction
For PDF files, text extraction requires libraries like PyPDF2
. Here's the asynchronous implementation:
# Async version of the extract_text_from_pdf
async def extract_text_from_pdf(file_content: bytes) -> str:
"""
Extract text from a PDF file.
"""
return await asyncio.to_thread(extract_text_from_pdf_sync, file_content)
def extract_text_from_pdf_sync(file_content: bytes) -> str:
"""
Extract text from a PDF file (blocking version).
"""
content = ""
pdf_reader = PyPDF2.PdfReader(file_content)
num_pages = len(pdf_reader.pages)
for i in range(num_pages):
page = pdf_reader.pages[i]
content += page.extract_text()
return content
Indexing Documents
The index_documents
function stores the extracted text in a Qdrant database, optimized for vector search and similarity queries.
async def index_documents(username,extracted_text,filename,file_extension):
try:
indexer = DocumentIndexer(qdrant_db_path)
start_time = time.time()
logger.info("Searching for similar documents in Qdrant...")
await indexer.index_in_qdrantdb(
extracted_text=extracted_text,
file_name=filename,
doc_type=file_extension,
chunk_size=1500
)
logger.info(f"Document indexing completed in {time.time() - start_time:.2f} seconds")
except Exception as e:
logger.error(f"Error processing documents: {str(e)}")
raise RuntimeError(f"Failed to process documents: {str(e)}")
Refer to the GitHub Code at the end of this article for Text extraction from different sources.
Chat Query Endpoint (
/chat
)Accepts user queries and provides responses based on the ingested knowledge.
Handles previous session data to maintain conversational context.
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
try:
# Process chat request
if request.session_id is not None:
past_messages = await get_past_conversation_async(request.session_id)
else:
request.session_id = str(uuid4())
past_messages = []
response, refined_query, extracted_documents = await generate_chatbot_response(
request.query, past_messages, request.no_of_chunks, request.username
)
await add_conversation_async(request.session_id, request.query, response)
return {
"username": request.username,
"query": request.query,
"refine_query": refined_query,
"response": response,
"session_id": request.session_id,
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error processing chat request: {str(e)}")
raise HTTPException(status_code=500, detail=f"Unexpected error: {e}")
Session Context Management
Fetching Past Conversations
To retain context, we retrieve past conversations from the SQLite database. Each session ID serves as a key to fetch previous interactions.
async def get_past_conversation_async(session_id: str) -> List[dict]:
start_time = asyncio.get_event_loop().time()
messages = []
try:
# Open an async SQLite connection
async with aiosqlite.connect("chat_log.db") as connection:
await connection.execute('''CREATE TABLE IF NOT EXISTS chat_logs (
session_id TEXT,
user_query TEXT,
gpt_response TEXT
)''')
logger.info("Database schema ensured.")
# Fetch chat logs for the given session_id
async with connection.execute(
"SELECT user_query, gpt_response FROM chat_logs WHERE session_id=?", (session_id,)
) as cursor:
async for row in cursor:
message_user = {"role": "user", "content": row[0]}
message_assistant = {"role": "assistant", "content": row[1]}
messages.extend([message_user, message_assistant])
elapsed_time = asyncio.get_event_loop().time() - start_time
logger.info(f"History For Context (get_conversation): {messages} in {elapsed_time:.2f}s")
return messages
except Exception as e:
logger.exception(f"Error occurred: {str(e)}")
raise e
Adding New Conversations
New conversations are stored in the database after processing. This ensures the chatbot can build upon prior interactions.
async def add_conversation_async(session_id, user_query, gpt_response):
try:
# Open an async SQLite connection
async with aiosqlite.connect(":memory:") as connection:
cursor = await connection.cursor()
# Create table if it doesn't exist
await cursor.execute('''CREATE TABLE IF NOT EXISTS chat_logs (
session_id TEXT,
user_query TEXT,
gpt_response TEXT)''')
# Insert new conversation
await cursor.execute("INSERT INTO chat_logs (session_id, user_query, gpt_response) VALUES (?, ?, ?)",
(session_id, user_query, gpt_response))
await connection.commit()
logger.info(f"Conversation added for session {session_id}")
except Exception as e:
logger.exception(f"Error occurred while adding conversation: {str(e)}")
raise e
Request and Response Models
FastAPI leverages Pydantic models for robust data validation and serialization, ensuring input data adheres to the expected format. For example:
ChatRequest
Model- Defines the structure of incoming requests to the chat endpoint, including the
username
,query
, and optionalsession_id
.
- Defines the structure of incoming requests to the chat endpoint, including the
from pydantic import BaseModel, field_validator
from typing import Optional, List
class ChatRequest(BaseModel):
username: str
query: str
session_id: Optional[str] = None
no_of_chunks: Optional[int] = 3
ChatResponse
Model- Specifies the format of the API response, including the query, refined query, and chatbot response.
class ChatResponse(BaseModel):
username: str
query: str
refine_query: str
response: str
session_id: str
debug_info: Optional[dict] = None
Orchestrating with LangChain
Implementing a RAG Chain
Combining vector search results with prompt engineering.
@ls.traceable(run_type="chain", name="Chat Pipeline")
async def generate_chatbot_response(query, past_messages, no_of_chunks,username):
"""Main function to generate chatbot responses asynchronously."""
logger.info("Refining user query")
refined_query = await refine_user_query(query, past_messages) # Async call
logger.info(f"Generated refined query: {refined_query}")
extracted_text_data, extracted_documents = await retrieve_similar_documents(refined_query, int(no_of_chunks),username) # Async call
# logger.info(f"Extracted text data: {extracted_text_data}")
logger.info(f"Extracted text data")
llm = initialize_llm() # Synchronous initialization
history = create_history(past_messages)
logger.info(f"Created history for session: {history}")
logger.info("Fetching response")
start_time = time.time()
final_response, cb = await invoke_chain(query, extracted_text_data, history, llm) # Async call
response_time = time.time() - start_time
# logger.info(f"Got response from chain: {final_response}")
logger.info(f"Got response from chain:")
return final_response, response_time, cb.prompt_tokens, cb.completion_tokens, cb.total_tokens, extracted_text_data, refined_query, extracted_documents
Query Refinement
User queries are often ambiguous, relying on prior interactions or chat history. To address this, we design a refinement mechanism to convert user input into
def get_query_refiner_prompt():
contextualize_q_system_prompt = ("""
"Given a chat history and the latest user question "
"which might reference context in the chat history, "
"formulate a standalone question which can be understood "
"without the chat history. Do NOT answer the question, "
"just reformulate it if needed and otherwise return it as it is."
""")
final_prompt = ChatPromptTemplate.from_messages(
[
("system", contextualize_q_system_prompt),
MessagesPlaceholder(variable_name="messages"),
("human","{query}"),
]
)
# print(final_prompt)
return final_prompt
async def refine_user_query(query, messages):
"""Refines the user query asynchronously."""
llm = ChatOpenAI(temperature=0, model_name="gpt-4o")
history = create_history(messages)
prompt = get_query_refiner_prompt()
refined_query_chain = prompt | llm | StrOutputParser()
refined_query = await refined_query_chain.ainvoke({"query": query, "messages": history.messages}) # Async method
return refined_query
Document Retrieval
The refined query serves as input for a retrieval mechanism. Using Qdrant, we extract contextually similar documents for subsequent processing.
async def retrieve_similar_documents(refined_query: str, num_of_chunks: int,username: str) -> str:
try:
indexer = DocumentIndexer(qdrant_db_path)
start_time = time.time()
logger.info("Searching for similar documents in Qdrant...")
if num_of_chunks is None:
num_of_chunks = os.getenv('no_of_chunks')
if not isinstance(num_of_chunks, int) or num_of_chunks <= 0:
raise ValueError(f"Invalid number of chunks: {num_of_chunks}")
retriever = await indexer.get_retriever(top_k=num_of_chunks)
if not retriever:
raise ValueError("Failed to initialize document retriever")
extracted_documents = await retriever.ainvoke(refined_query)
if not extracted_documents:
extracted_text_data=""
else:
extracted_text_data = await format_docs(extracted_documents)
logger.info(f"Document retrieval and formatting completed in {time.time() - start_time:.2f} seconds")
return extracted_text_data, extracted_documents
except Exception as e:
logger.error(f"Error processing documents: {str(e)}")
raise RuntimeError(f"Failed to process documents: {str(e)}")
Prompt Engineering
Effective prompts are the backbone of any LLM-based pipeline. We design a system prompt that combines user inputs with the retrieved context.
def get_main_prompt():
prompt = """
"You are an assistant for question-answering tasks."
"Use the following pieces of retrieved context and user information to answer the question."
"If you don't find the answer of the query,then just say I don't have that information at hand. Please provide more details or check your sources."
"""
prompt=prompt + "\\n\\n" + "{context}"
final_prompt = ChatPromptTemplate.from_messages(
[
("system", prompt),
MessagesPlaceholder (variable_name="messages"),
("human", "{user_query}")
])
return final_prompt
async def invoke_chain(query, context, history, llm):
"""Handles the streamed response asynchronously."""
logger.info(f"Initializing Chain using ...")
final_chain = get_main_prompt() | llm | StrOutputParser()
logger.info("Chain initialized.")
input_data = {"user_query": query, "context": context, "messages": history.messages}
with get_openai_callback() as cb:
final_response = await final_chain.ainvoke(input_data) # Asynchronous method
return final_response, cb
Learn how to leverage LangChain Expression Language (LCEL) for seamless chain composition, including prompt formatting, retrieval-augmented generation (RAG), and efficient batching, with practical examples. Discover how LCEL simplifies building advanced LLM applications with features like streaming, parallelism, and async support! Checkout the video below
Future Improvements
Enhanced Document Preprocessing: Implement advanced text chunking methods incorporating summarization, document-based chunking, semantic and agentic chunking, and multimodal support.
Storage: Transition from in-memory to persistent storage ensures data durability across sessions. Migrating to async PostgreSQL enhances scalability and performance for larger datasets and higher user concurrency.
Dynamic Few-Shot Learning: Automatically generate examples based on query type.
Adaptive Retrieval: Use feedback loops to improve retrieval accuracy over time.
Real-Time User Feedback: Allow users to fine-tune the response in real-time.
Conclusion
Through our extensive experience with asynchronous FastAPI and building RAG systems, we have successfully optimized every operation in the pipeline to work seamlessly in an asynchronous manner. From document ingestion and indexing in Qdrant to efficient retrieval of relevant context and history storage, we have made each operation highly efficient by adopting async-first principles.
By adopting this approach, developers can craft intelligent systems capable of providing contextually accurate and highly relevant responses. From document indexing to dynamic query refinement and real-time conversational AI, the RAG architecture represents a significant leap forward in harnessing the capabilities of large language models.
At FutureSmart AI, we specialize in delivering state-of-the-art AI solutions tailored to the unique needs of businesses. Leveraging advanced technologies such as RAG, NL2SQL, multi-agent architectures, LangChain, LangGraph, Qdrant, Chroma vector databases, and OpenAI, we have successfully implemented cutting-edge systems for multiple clients—from intelligent customer service automation to advanced AI-driven interview platforms.
If you want to leverage the power of AI Applications with asynchronous FastAPI, we’re here to help. Reach out to us at contact@futuresmart.ai and discover how our experience can translate into practical, cutting-edge solutions tailored for your needs.
Subscribe to my newsletter
Read articles from Shreyas Dhaware directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
