Building a RAG System with Async FastAPI, Qdrant, Langchain and OpenAI

Shreyas DhawareShreyas Dhaware
14 min read

Introduction

In the era of advanced AI applications, Retrieval-Augmented Generation (RAG) stands out as a game-changing approach. By combining retrieval techniques with generative models, RAG enhances the quality, accuracy, and relevance of generated outputs. This blog walks you through building a scalable and efficient RAG system using FastAPI, Qdrant, LangChain, and OpenAI, all while leveraging asynchronous capabilities for improved performance.

At FutureSmart AI, we are committed to pioneering innovative solutions and leveraging cutting-edge technologies. Building a RAG system with Async FastAPI, Qdrant, Langchain, and OpenAI has helped us create efficient and Highly scalable AI-powered applications for our clients. This blog primarily reflects on our dedication to empowering developers and organizations with actionable knowledge to implement high-performance systems.

While this blog focuses on an on-premise setup for a hands-on approach, drawing from our experience we can assure that these tools also support scalable cloud-based deployments, ensuring flexibility for production-ready solutions. At FutureSmart AI, we’re always exploring and refining methods to push the boundaries of what AI can achieve.

Overview of Retrieval-Augmented Generation (RAG)

RAG combines two essential components:

  1. Retrieval: Find relevant documents from a large dataset. This part uses a search mechanism to identify the most relevant passages from a large text based on the input query.

  2. Generation: Uses a language model to generate context-aware answers. Once relevant information is retrieved, a language model generates the final response by incorporating the retrieved context into the generated text.

This integration empowers Retrieval-Augmented Generation (RAG) to deliver more accurate and contextually relevant responses compared to standalone Large Language Models (LLMs).

For a comprehensive understanding, explore our Langchain RAG Course: From Basics to Production-Ready RAG Chatbot or, if you prefer reading, visit our detailed Blog for more insights.

The Tech Stack: What You Need & Why

Let's break down our tools and why we chose them. Each one plays a crucial role in building a powerful RAG system.

FastAPI

FastAPI enables the rapid development of performant web APIs with asynchronous capabilities. Its support for Python-type hints makes it developer-friendly and robust.

For more information, check out the FastAPI Tutorial.

Qdrant

Qdrant excels in high-dimensional vector storage and retrieval operations. In our enterprise implementations, it has proven invaluable for:

  • Efficient management of large-scale vector datasets

  • Optimal performance in similarity search operations

  • Seamless horizontal scaling capabilities

For a detailed and in-depth explanation please refer to our Comprehensive Guide to Installing and Using Qdrant VectorDB with Docker Server and Local Setup

LangChain

LangChain and its components, such as chains, prompts, and memory, enable efficient interaction with LLMs.

OpenAI

We will use OpenAI’s language models in this tutorial. You'll also need a basic understanding of how to send queries to OpenAI’s API and interpret responses.

Prerequisites

  1. Create a Python Virtual Environment

It’s recommended to use a virtual environment to isolate your dependencies.

  1. Install Dependencies

Use the provided requirements.txt file to install the necessary Python packages.

pip install -r requirements.txt
  1. Setting up API Keys

To connect to external services like OpenAI and Qdrant, you need to set up API keys securely.

  1. Create a .env File

Create a .env file in the root of your project directory to store sensitive information like API keys and configuration details.

Example .env file:

OPENAI_API_KEY=your_openai_api_key
qdrant_db_path=http://localhost:6333  # Replace with your Qdrant URL
llm_provider="openai"
model="gpt-4o-mini"
temperature="0.1"
chunk_size = 2000
no_of_chunks = 3
  1. Load Environment Variables

Use libraries like python-dotenv to load the .env file into your application.

from dotenv import load_dotenv
import os

load_dotenv()

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
QDRANT_URL = os.getenv("QDRANT_URL")

Project Structure

services/
    logger.py
    pydantic_models.py
uploads/
    xxx.txt
    yyy.pdf
    zzz.docx
utils/
    __init__.py
    db_utils.py
    langchain_utils.py
    prompts.py
    qdrant_utils.py
    utils.py
.env
api.py
  • services/: This folder houses essential services that support core functionalities:

    • logger.py: Manages the logging setup for the application. Logging is critical for debugging, monitoring, and tracking the application's behavior.

    • pydantic_models.py: Defines Pydantic models used for data validation and serialization. Pydantic ensures data entering the application is valid and formatted correctly.

  • uploads/: A dedicated folder for file uploads. This is where the application stores temporary or permanent files uploaded by users.

  • utils/: A utility module containing helper scripts that encapsulate reusable logic:

    • __init__.py: Marks the folder as a Python package.

    • db_utils.py: Contains functions for interacting with the database.

    • langchain_utils.py: Provides utility functions for integrating LangChain, a framework for language model applications.

    • prompts.py: Stores pre-defined prompts for interacting with language models or other AI systems.

    • qdrant_utils.py: Handles operations with Qdrant, a vector search engine for similarity-based search.

    • utils.py: General-purpose utility functions used across the project.

  • .env: A configuration file storing environment variables like database credentials, API keys, and other sensitive data.

  • api.py: The application's entry point is where FastAPI initializes and routes are defined. This file connects all the components and defines the API endpoints.

Setting Up Qdrant for Efficient Retrieval

Imports and Configuration

import os
import time
from dotenv import load_dotenv
from uuid import uuid4
import asyncio

# Langchain imports
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_qdrant import QdrantVectorStore
from langchain_openai import OpenAIEmbeddings

# Qdrant imports
from qdrant_client import QdrantClient, AsyncQdrantClient
from qdrant_client.http.models import Distance, VectorParams

from services.logger import logger
from uuid import uuid4

load_dotenv(override=True)

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
qdrant_db_path=os.getenv("qdrant_db_path")

The setup begins with importing essential libraries, loading environment variables (like API keys), and initializing necessary configurations. Notable imports include LangChain's RecursiveCharacterTextSplitter for chunking documents and Qdrant’s async client for vector database interactions.

DocumentIndexer Class

The DocumentIndexer class handles indexing and retrieval in Qdrant. Let’s break it down step-by-step.

Initialization

class DocumentIndexer:
    def __init__(self, qdrant_db_path):
        self.db_path = qdrant_db_path
        self.embedding_function = OpenAIEmbeddings(model="text-embedding-3-large", api_key=OPENAI_API_KEY)
        self.vector_store = None
        self.client = AsyncQdrantClient(self.db_path)
  • embedding_function: Uses OpenAI’s embeddings to convert text into dense vector representations.

  • client: Initializes an async Qdrant client to manage the vector database.

  • vector_store: Qdrant vector store is used to add documents and manage their vector representations.

Indexing Text in Qdrant

The method index_in_qdrantdb handles the extraction and indexing of document text. Here’s how it works:

async def index_in_qdrantdb(self, extracted_text, file_name, doc_type, chunk_size):
    try:
        # Create a Document object
        doc = Document(
            page_content=extracted_text,
            metadata={
                "file_name": file_name,
                "doc_type": doc_type
            }
        )


        chunk_size = int(os.getenv("chunk_size"))
        logger.info(f"Using dynamic chunk size: {chunk_size}")

        # Split the document
        text_splitter = RecursiveCharacterTextSplitter(
            separators=['\\n\\n', '\\n', ','],
            chunk_size=chunk_size,
            chunk_overlap=200
        )
        docus = text_splitter.split_documents([doc])

        # Generate UUIDs for all chunks
        uuids = [f"{str(uuid4())}" for _ in range(len(docus))]
        collection = "rag_demo_collection"

        collections = await self.client.get_collections()

        if collection in [collection_name.name for collection_name in collections.collections]:
            logger.info(f"Collection {collection} already exists in QdrantDB")
        else:
            await self.client.create_collection(
                collection_name=collection,
                vectors_config=VectorParams(size=3072, distance=Distance.COSINE))

        self.vector_store =  QdrantVectorStore.from_existing_collection(collection_name=collection, embedding=self.embedding_function, url=self.db_path)

        await self.vector_store.aadd_documents(documents=docus, ids=uuids)

        logger.info(f"Successfully indexed document in QdrantDB")
        return True

    except Exception as e:
        logger.error(f"Error indexing document in QdrantDB: {e}")
        raise

Key Points:

  1. Document Creation: Loading and splitting extracted data asynchronously for efficient processing. A Document object is created to store extracted text and metadata.

  2. Chunking of Document: The document is divided into manageable chunks using RecursiveCharacterTextSplitter.

  3. Collection Management: The Qdrant collection is created only if it doesn’t already exist.

  4. Batch Indexing: Chunks are added to the Qdrant database with unique UUIDs.

  5. Using openai embedding model to create vector representations of documents.

  6. Asynchronously uploading documents to Qdrant for similarity search.

Retrieving Documents

To enable querying of indexed data, the get_retriever method returns a retriever:

async def get_retriever(self, top_k):
    try:
        collection = "rag_demo_collection"
        if self.vector_store is None:
            self.vector_store =  QdrantVectorStore.from_existing_collection(collection_name=collection, embedding=self.embedding_function, url=self.db_path)

        return self.vector_store.as_retriever(search_type="similarity", search_kwargs={"k": top_k})
    except Exception as e:
        logger.error(f"Error creating retriever: {e}")
        raise

Key Points:

  1. Retriever Initialization: If the vector_storeobject doesn’t exist, it initializes a retriever from the existing collection.

  2. Search Parameters: Supports similarity-based searches with a configurable top_k parameter.

Implementing the Asynchronous FastAPI Endpoint

Asynchronous endpoints allow the server to handle multiple requests simultaneously, which is essential for applications that process large files or perform complex computations.

Setting Up FastAPI

The first step is initializing a FastAPI application that supports asynchronous request handling. This allows the server to process multiple incoming requests without blocking other operations, essential for high-performance APIs.

app = FastAPI()

The application is initialized with the FastAPI() class, which serves as the primary entry point for defining routes and handling requests.

Defining API Routes

The code leverages async def for efficient non-blocking request handling, ensuring high performance under concurrent loads. Two main routes are implemented: the /upload-knowledge endpoint and the /chat endpoint. These routes demonstrate seamless integration of file processing, database operations, and conversational AI.

  1. Document Ingestion Endpoint (/upload-knowledge)

    • Allows users to upload files containing knowledge documents.

    • Extracts text from the uploaded file and indexes it in a database for future query responses

    @app.post("/upload-knowledge")
    async def upload_knowledge(
        username: str = Form(...),
        file: Optional[UploadFile] = File(None)
    ):
        try:
            # Handle file extraction and indexing
            extracted_text = ""
            if file:
                logger.info(f"File uploaded: {file.filename}")
                file_content = await file.read()
                file_extension = file.filename.split('.')[-1].lower()
                extracted_text = await extract_text_from_file(file_content, file_extension)
                logger.info(f"Extracted text from file: {extracted_text}")
                await index_documents(username, extracted_text, file.filename, file_extension)
            return {'response': 'Indexed Documents Successfully', 'extracted_text': extracted_text}
        except ValueError as e:
            raise HTTPException(status_code=400, detail=str(e))
        except Exception as e:
            logger.error(f"Error processing indexing request: {str(e)}")
            raise HTTPException(status_code=500, detail=f"Unexpected error: {e}")

File Text Extraction

The function extract_text_from_file extracts text from different file types (e.g., TXT, PDF, DOCX).

    # Asynchronous file text extraction
    async def extract_text_from_file(file_content: bytes, file_type: str) -> str:
        """
        Extract text from different file types based on the file type.
        """
        if file_type == "txt":
            return await extract_text_from_txt(file_content)
        elif file_type == "pdf":
            return await extract_text_from_pdf(file_content)
        elif file_type == "docx":
            return await extract_text_from_docx(file_content)
        else:
            raise HTTPException(status_code=400, detail="Unsupported file type")

PDF Text Extraction

For PDF files, text extraction requires libraries like PyPDF2. Here's the asynchronous implementation:

    # Async version of the extract_text_from_pdf
    async def extract_text_from_pdf(file_content: bytes) -> str:
        """
        Extract text from a PDF file.
        """
        return await asyncio.to_thread(extract_text_from_pdf_sync, file_content)

      def extract_text_from_pdf_sync(file_content: bytes) -> str:
        """
        Extract text from a PDF file (blocking version).
        """
        content = ""
        pdf_reader = PyPDF2.PdfReader(file_content)
        num_pages = len(pdf_reader.pages)
        for i in range(num_pages):
            page = pdf_reader.pages[i]
            content += page.extract_text()
        return content

Indexing Documents

The index_documents function stores the extracted text in a Qdrant database, optimized for vector search and similarity queries.

    async def index_documents(username,extracted_text,filename,file_extension):
        try:
            indexer = DocumentIndexer(qdrant_db_path)
            start_time = time.time()
            logger.info("Searching for similar documents in Qdrant...")

            await indexer.index_in_qdrantdb(
                extracted_text=extracted_text,
                file_name=filename,
                doc_type=file_extension,
                chunk_size=1500  
            )
            logger.info(f"Document indexing completed in {time.time() - start_time:.2f} seconds")

        except Exception as e:
            logger.error(f"Error processing documents: {str(e)}")
            raise RuntimeError(f"Failed to process documents: {str(e)}")

Refer to the GitHub Code at the end of this article for Text extraction from different sources.

  1. Chat Query Endpoint (/chat)

    • Accepts user queries and provides responses based on the ingested knowledge.

    • Handles previous session data to maintain conversational context.

    @app.post("/chat", response_model=ChatResponse)
    async def chat(request: ChatRequest):
        try:
            # Process chat request
            if request.session_id is not None:
                past_messages = await get_past_conversation_async(request.session_id)
            else:
                request.session_id = str(uuid4())
                past_messages = []

            response, refined_query, extracted_documents = await generate_chatbot_response(
                request.query, past_messages, request.no_of_chunks, request.username
            )
            await add_conversation_async(request.session_id, request.query, response)
            return {
                "username": request.username,
                "query": request.query,
                "refine_query": refined_query,
                "response": response,
                "session_id": request.session_id,
            }
        except ValueError as e:
            raise HTTPException(status_code=400, detail=str(e))
        except Exception as e:
            logger.error(f"Error processing chat request: {str(e)}")
            raise HTTPException(status_code=500, detail=f"Unexpected error: {e}")

Session Context Management

Fetching Past Conversations

To retain context, we retrieve past conversations from the SQLite database. Each session ID serves as a key to fetch previous interactions.

    async def get_past_conversation_async(session_id: str) -> List[dict]:
        start_time = asyncio.get_event_loop().time()
        messages = []

        try:
            # Open an async SQLite connection
            async with aiosqlite.connect("chat_log.db") as connection:
                await connection.execute('''CREATE TABLE IF NOT EXISTS chat_logs (
                    session_id TEXT,
                    user_query TEXT,
                    gpt_response TEXT
                )''')
                logger.info("Database schema ensured.")

                # Fetch chat logs for the given session_id
                async with connection.execute(
                    "SELECT user_query, gpt_response FROM chat_logs WHERE session_id=?", (session_id,)
                ) as cursor:
                    async for row in cursor:
                        message_user = {"role": "user", "content": row[0]}
                        message_assistant = {"role": "assistant", "content": row[1]}
                        messages.extend([message_user, message_assistant])

            elapsed_time = asyncio.get_event_loop().time() - start_time
            logger.info(f"History For Context (get_conversation): {messages} in {elapsed_time:.2f}s")
            return messages

        except Exception as e:
            logger.exception(f"Error occurred: {str(e)}")
            raise e

Adding New Conversations

New conversations are stored in the database after processing. This ensures the chatbot can build upon prior interactions.

    async def add_conversation_async(session_id, user_query, gpt_response):
        try:
            # Open an async SQLite connection
            async with aiosqlite.connect(":memory:") as connection:
                cursor = await connection.cursor()

                # Create table if it doesn't exist
                await cursor.execute('''CREATE TABLE IF NOT EXISTS chat_logs (
                                            session_id TEXT,
                                            user_query TEXT,
                                            gpt_response TEXT)''')

                # Insert new conversation
                await cursor.execute("INSERT INTO chat_logs (session_id, user_query, gpt_response) VALUES (?, ?, ?)",
                                    (session_id, user_query, gpt_response))

                await connection.commit()
                logger.info(f"Conversation added for session {session_id}")

        except Exception as e:
            logger.exception(f"Error occurred while adding conversation: {str(e)}")
            raise e

Request and Response Models

FastAPI leverages Pydantic models for robust data validation and serialization, ensuring input data adheres to the expected format. For example:

  • ChatRequest Model

    • Defines the structure of incoming requests to the chat endpoint, including the username, query, and optional session_id.
    from pydantic import BaseModel, field_validator
    from typing import Optional, List

    class ChatRequest(BaseModel):
        username: str
        query: str
        session_id: Optional[str] = None
        no_of_chunks: Optional[int] = 3
  • ChatResponse Model

    • Specifies the format of the API response, including the query, refined query, and chatbot response.
    class ChatResponse(BaseModel):
        username: str
        query: str
        refine_query: str
        response: str
        session_id: str
        debug_info: Optional[dict] = None

Orchestrating with LangChain

Implementing a RAG Chain

Combining vector search results with prompt engineering.

@ls.traceable(run_type="chain", name="Chat Pipeline")
async def generate_chatbot_response(query, past_messages, no_of_chunks,username):
    """Main function to generate chatbot responses asynchronously."""
    logger.info("Refining user query")
    refined_query = await refine_user_query(query, past_messages)  # Async call
    logger.info(f"Generated refined query: {refined_query}")

    extracted_text_data, extracted_documents = await retrieve_similar_documents(refined_query, int(no_of_chunks),username)  # Async call
    # logger.info(f"Extracted text data: {extracted_text_data}")
    logger.info(f"Extracted text data")


    llm = initialize_llm()  # Synchronous initialization
    history = create_history(past_messages)
    logger.info(f"Created history for session: {history}")

    logger.info("Fetching response")
    start_time = time.time()
    final_response, cb = await invoke_chain(query, extracted_text_data, history, llm)  # Async call
    response_time = time.time() - start_time

    # logger.info(f"Got response from chain: {final_response}")
    logger.info(f"Got response from chain:")

    return final_response, response_time, cb.prompt_tokens, cb.completion_tokens, cb.total_tokens, extracted_text_data, refined_query, extracted_documents

Query Refinement

User queries are often ambiguous, relying on prior interactions or chat history. To address this, we design a refinement mechanism to convert user input into

def get_query_refiner_prompt():
    contextualize_q_system_prompt = ("""
    "Given a chat history and the latest user question "
    "which might reference context in the chat history, "
    "formulate a standalone question which can be understood "
    "without the chat history. Do NOT answer the question, "
    "just reformulate it if needed and otherwise return it as it is."
    """)

    final_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", contextualize_q_system_prompt),
            MessagesPlaceholder(variable_name="messages"),
            ("human","{query}"),
        ]
    )
    # print(final_prompt)
    return final_prompt

  async def refine_user_query(query, messages):
    """Refines the user query asynchronously."""
    llm = ChatOpenAI(temperature=0, model_name="gpt-4o")
    history = create_history(messages)
    prompt = get_query_refiner_prompt()
    refined_query_chain = prompt | llm | StrOutputParser()
    refined_query = await refined_query_chain.ainvoke({"query": query, "messages": history.messages})  # Async method
    return refined_query

Document Retrieval

The refined query serves as input for a retrieval mechanism. Using Qdrant, we extract contextually similar documents for subsequent processing.

async def retrieve_similar_documents(refined_query: str, num_of_chunks: int,username: str) -> str:
    try:
        indexer = DocumentIndexer(qdrant_db_path)
        start_time = time.time()
        logger.info("Searching for similar documents in Qdrant...")

        if num_of_chunks is None:
            num_of_chunks = os.getenv('no_of_chunks')
        if not isinstance(num_of_chunks, int) or num_of_chunks <= 0:
            raise ValueError(f"Invalid number of chunks: {num_of_chunks}")
        retriever = await indexer.get_retriever(top_k=num_of_chunks)
        if not retriever:
            raise ValueError("Failed to initialize document retriever")
        extracted_documents = await retriever.ainvoke(refined_query)
        if not extracted_documents:
            extracted_text_data=""
        else:
            extracted_text_data = await format_docs(extracted_documents)
        logger.info(f"Document retrieval and formatting completed in {time.time() - start_time:.2f} seconds")
        return extracted_text_data, extracted_documents

    except Exception as e:
        logger.error(f"Error processing documents: {str(e)}")
        raise RuntimeError(f"Failed to process documents: {str(e)}")

Prompt Engineering

Effective prompts are the backbone of any LLM-based pipeline. We design a system prompt that combines user inputs with the retrieved context.

def get_main_prompt():
    prompt = """ 
    "You are an assistant for question-answering tasks."
    "Use the following pieces of retrieved context and user information to answer the question."
    "If you don't find the answer of the query,then just say I don't have that information at hand. Please provide more details or check your sources."
    """
    prompt=prompt + "\\n\\n" + "{context}"

    final_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", prompt),
        MessagesPlaceholder (variable_name="messages"),
        ("human", "{user_query}")
    ])
    return final_prompt

async def invoke_chain(query, context, history, llm):
    """Handles the streamed response asynchronously."""
    logger.info(f"Initializing Chain using ...")
    final_chain = get_main_prompt() | llm | StrOutputParser()
    logger.info("Chain initialized.")
    input_data = {"user_query": query, "context": context, "messages": history.messages}

    with get_openai_callback() as cb:
        final_response = await final_chain.ainvoke(input_data)  # Asynchronous method

    return final_response, cb

Learn how to leverage LangChain Expression Language (LCEL) for seamless chain composition, including prompt formatting, retrieval-augmented generation (RAG), and efficient batching, with practical examples. Discover how LCEL simplifies building advanced LLM applications with features like streaming, parallelism, and async support! Checkout the video below

Future Improvements

  • Enhanced Document Preprocessing: Implement advanced text chunking methods incorporating summarization, document-based chunking, semantic and agentic chunking, and multimodal support.

  • Storage: Transition from in-memory to persistent storage ensures data durability across sessions. Migrating to async PostgreSQL enhances scalability and performance for larger datasets and higher user concurrency.

  • Dynamic Few-Shot Learning: Automatically generate examples based on query type.

  • Adaptive Retrieval: Use feedback loops to improve retrieval accuracy over time.

  • Real-Time User Feedback: Allow users to fine-tune the response in real-time.

Conclusion

Through our extensive experience with asynchronous FastAPI and building RAG systems, we have successfully optimized every operation in the pipeline to work seamlessly in an asynchronous manner. From document ingestion and indexing in Qdrant to efficient retrieval of relevant context and history storage, we have made each operation highly efficient by adopting async-first principles.

By adopting this approach, developers can craft intelligent systems capable of providing contextually accurate and highly relevant responses. From document indexing to dynamic query refinement and real-time conversational AI, the RAG architecture represents a significant leap forward in harnessing the capabilities of large language models.

At FutureSmart AI, we specialize in delivering state-of-the-art AI solutions tailored to the unique needs of businesses. Leveraging advanced technologies such as RAG, NL2SQL, multi-agent architectures, LangChain, LangGraph, Qdrant, Chroma vector databases, and OpenAI, we have successfully implemented cutting-edge systems for multiple clients—from intelligent customer service automation to advanced AI-driven interview platforms.

If you want to leverage the power of AI Applications with asynchronous FastAPI, we’re here to help. Reach out to us at contact@futuresmart.ai and discover how our experience can translate into practical, cutting-edge solutions tailored for your needs.

4
Subscribe to my newsletter

Read articles from Shreyas Dhaware directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Shreyas Dhaware
Shreyas Dhaware