How I Developed a Custom RAG Instead of Using LangChain

Rishee PanchalRishee Panchal
12 min read

Introduction

If you're reading this, you probably know what LangChain is, right? We all recognize it's a great tool for creating RAG pipelines and multi-AI agents. However, after working on multiple projects using LangChain and LangGraphs, I realized there are some challenges when using them. You might wonder what issues arise when using a pre-made abstracted library. Here are a few I've identified:

  • Overhead: Even basic tasks like chunking or retrieval need boilerplate or chaining mechanisms.

  • Black-box abstraction: Hard to debug or customize individual components.

These are just a couple of examples. When you create your own RAG, as I did, you'll understand how much easier it is to manage tasks without relying on prebuilt libraries.

This post walks through how I built this step by step. Each section includes a short explanation and a code snippet.

Document Processing

Before we start, we need to load the raw data, such as PDFs, text files, or DOCX files. I kept it simple by using PyPDF2 for PDFs and basic Python I/O for .txt and .docx files.

import docx
import PyPDF2
import os

def read_text_file(file_path: str):
  """Read content from a text file"""
  with open(file_path, 'r', encoding = "utf-8") as file:
    return file.read()

def read_pdf_file(file_path: str):
  """Read content from a PDF file"""
  text = ""
  with open(file_path, 'rb') as file:
    pdf_reader = PyPDF2.PdfReader(file)
    for page in pdf_reader.pages:
      text += page.extract_text() + "\n"
  return text

def read_docx_file(file_path: str):
  """Read content from a Docx file"""
  doc = docx.Document(file_path)
  return "\n".join([paragraph.text for paragraph in doc.paragraphs])
# creating a unified function to read any document

def read_document(file_path: str):
  """Read content based on the file extenstion"""
  _, file_extension = os.path.splitext(file_path)
  file_extension = file_extension.lower()

  if file_extension == '.txt':
    return read_text_file(file_path)

  elif file_extension == '.pdf':
    return read_pdf_file(file_path)

  elif file_extension == '.docx':
    return read_docx_file(file_path)

  else:
    raise ValueError(f"unsupported file extension: {file_extension}")

Creating Text Chunks

The next step in the process is chunking, which involves breaking down the text into manageable pieces. Initially, I cleaned the sentences to ensure they are free from any unnecessary characters or formatting issues. After cleaning, I proceeded with the chunking process. The code provided below ensures that each chunk adheres to a specified chunk size, meaning that no chunk will exceed this predetermined limit.

def split_text(text: str, chunk_size: int = 500):
  """Split the document extracted text into chunks"""
  sentences = text.replace('\n', ' ').split('. ')
  chunks = []
  current_chunk = []
  current_size = 0

  for sentence in sentences:
    sentence = sentence.strip()
    if not sentence:
      continue
    if not sentence.endswith('.'):
      sentence += '.'

    sentence_size = len(sentence)

    if current_size + sentence_size > chunk_size and current_chunk:
      chunks.append(' '.join(current_chunk))
      current_chunk = [sentence]
      current_size = sentence_size
    else:
      current_chunk.append(sentence)
      current_size += sentence_size

  if current_chunk:
    chunks.append(' '.join(current_chunk))

  return chunks

Setting up ChromaDB

Instead of relying on a remote vector database, I opted to use ChromaDB locally to enhance both simplicity and performance. This decision also improved performance by minimizing latency, as data retrieval and storage operations could be executed more quickly without the need for network communication.

import chromadb
from chromadb.utils import embedding_functions

client = chromadb.PersistentClient(path = "chromadb")

sentence_transformer_embedding = embedding_functions.SentenceTransformerEmbeddingFunction(
    model_name = "all-MiniLM-L6-v2"
)

collection = client.get_or_create_collection(
    name = "documents",
    embedding_function = sentence_transformer_embedding
)

Inserting data into ChromaDB

After successfully setting up ChromaDB, the subsequent step involves inserting data into the database. This process begins by utilizing the previously defined functions, namely read_document and split_text. These functions are essential for processing the documents that we intend to store. First, we apply the read_document function to load the content of the documents into our system. Once the documents are loaded, we use the split_text function to break down the text into manageable chunks.

def process_document(file_path: str):
  """Prepare the data for insertion into ChromaDB by reading, chunking,
  and attaching metadata and IDs to the document."""
  try:
    # Read the document content from the file
    content = read_document(file_path)

    # If no content is extracted, log a warning and return empty lists
    if content is None:
      print(f"Warning: No content extracted from {file_path}")
      return [], [], []

    # Split the document into manageable text chunks
    chunks = split_text(content)

    # Extract file name to use in metadata and IDs
    file_name = os.path.basename(file_path)

    # Create metadata: one dictionary per chunk with source and chunk index
    metadata = [{"source": file_name, "chunk": i} for i in range(len(chunks))]

    # Generate unique IDs for each chunk based on filename and chunk index
    id = [f"{file_name}_chunk_{i}" for i in range(len(chunks))]

    return id, chunks, metadata

  except Exception as e:
    # Log any unexpected error and return empty lists
    print(f"Error Processing Data from {file_path}: {str(e)}")
    return [], [], []


def add_to_collection(collection, id, texts, metadata):
  """Add documents to the vector store collection in batches of 100 for efficiency."""
  if not texts:
    # If there are no texts to add, exit early
    return

  batch_size = 100
  # Iterate over the texts in batches
  for i in range(0, len(texts), batch_size):
    idx = min(i + batch_size, len(texts))
    # Add the current batch of texts, metadata, and IDs to the collection
    collection.add(
        documents = texts[i:idx],
        metadatas = metadata[i:idx],
        ids = id[i:idx]
    )


def process_and_add_documents(collection, folder_path: str):
  """Process all documents in the specified folder and add them to the collection."""
  # List all file paths in the folder (skip directories)
  files = [os.path.join(folder_path, file)
          for file in os.listdir(folder_path)
          if os.path.isfile(os.path.join(folder_path, file))]

  for file_path in files:
    # Log progress
    print(f"Processing {os.path.basename(file_path)}")

    # Process the document into chunks, metadata, and IDs
    id, texts, metadata = process_document(file_path)

    # Add the processed data to the collection
    add_to_collection(collection, id, texts, metadata)

    # Log how many chunks were added
    print(f"Added {len(texts)} chunks to collection")

Example usage:

folder_path = "/content/drive/MyDrive/Docs"
process_and_add_documents(collection, folder_path)

output:
Processing Company_ QuantumNext Systems.docx
Added 2 chunks to collection
Processing Company_ GreenFields BioTech.docx
Added 2 chunks to collection
Processing Company_ TechWave Innovations.docx
Added 1 chunks to collection
Processing GreenGrow Innovations_ Company History.docx
Added 5 chunks to collection
Processing GreenGrow's EcoHarvest System_ A Revolution in Farming.pdf
Added 6 chunks to collection

Once we have embedded vectors stored in ChromaDB, we can run semantic search on the collection we just created.

def semantic_search(collection, query: str, n_results: int = 2):
  """Perform semantic search on collection"""
  # Query the vector store with the input query string and get top n_results matches
  results = collection.query(
      query_texts = [query],
      n_results = n_results
  )
  return results

def get_context_with_sources(results):
  """Get context & source"""
  # Join all retrieved document chunks from the first query result into one context string
  context = "\n\n".join(results['documents'][0])  # accesses the first batch from retrieved docs

  # Extract and format source information from metadata for each chunk
  sources = [
      f"{meta['source']} (chunk {meta['chunk']})"
      for meta in results['metadatas'][0]
  ]
  return context, sources

Example usage:

query = "When was GreenGroq invented?"
results = semantic_search(collection, query)

output: 
{'ids': [['GreenGrow Innovations_ Company History.docx_chunk_0',
   'GreenGrow Innovations_ Company History.docx_chunk_1']],
 'embeddings': None,
 'documents': [['GreenGrow Innovations was founded in 2010 by Sarah Chen and Michael Rodriguez.....']],
 'uris': None,
 'included': ['metadatas', 'documents', 'distances'],
 'data': None,
 'metadatas': [[{'chunk': 0,
    'source': 'GreenGrow Innovations_ Company History.docx'},
   {'source': 'GreenGrow Innovations_ Company History.docx', 'chunk': 1}]],
 'distances': [[0.7566683292388916, 0.8583546876907349]]}

Clearly, the output received isn't formatted or clean enough. So, we can use the code below to tidy up the results.

def print_search_results(results):
  """Print formatted search results"""
  print("\nSearch Results:\n" + "-" * 50)

  for i in range(len(results['documents'][0])):
    doc = results['documents'][0][i]
    metadata = results['metadatas'][0][i]
    distances = results['distances'][0][i]

    print(f"\nResult: {i+1}")
    print(f"Source: {metadata['source']}, Chunk {metadata['chunk']}")
    print(f"Distance: {distances}")
    print(f"Content: {doc}")
print_search_results(results)

output:
Search Results:
--------------------------------------------------

Result: 1
Source: GreenGrow Innovations_ Company History.docx, Chunk 0
Distance: 0.7566683292388916
Content: GreenGrow Innovations was founded in 2010 by Sarah Chen and Michael Rodriguez...
Result: 2
Source: GreenGrow Innovations_ Company History.docx, Chunk 1
Distance: 0.8583546876907349
Content: Their first product, the WaterWise Sensor, was launched in 2012 and quickly....

Setting up Gemini

Gemini API can be used for generation. I used Gemini Flash via google.generativeai.

import google.generativeai as genai
from google.colab import userdata
import os

# Retrieve the API key securely from Colab's userdata
GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')

# Configure the Gemini client with the API key
genai.configure(api_key=GOOGLE_API_KEY)

def get_prompt(context: str, conversation_history: str, query: str):
  """Generate a prompt combining context, history and query"""
  # Create a formatted prompt string that includes document context, prior conversation, and user query
  prompt = f""" based on the following contextand conversation history, please provide a
  relevant aand contexual response. If the answer cannot be derived from the context,
  only use the conversation history or say 'I cannot answer this based on the provided information'

  Context from documents: {context}
  previous conversation: {conversation_history}
  Human: {query}
  Assistant:"""

  return prompt

def generate_response(query: str, context: str, conversation_history: str = ""):
    """Generate a response using Gemini with context and conversation history"""
    # Generate the full prompt with context and query
    prompt = get_prompt(context, conversation_history, query)

    # Initialize the Gemini model
    model = genai.GenerativeModel("gemini-2.5-flash")

    try:
        # Generate a response with deterministic output (temperature = 0.0)
        response = model.generate_content(
            prompt,
            generation_config={
                "temperature": 0.0,
                "max_output_tokens": 500,
            }
        )
        return response.text
    except Exception as e:
        # Handle any errors that occur during generation
        return f"Error generating response: {str(e)}"

Test basic RAG

And finally, it is time to test our basic RAG. i.e, putting up everything we’ve done so far together.

def rag_query(collection, query:str, n_chunks: int = 2):
  """retirver relevant chunks and generate answer """
  results = semantic_search(collection, query, n_chunks)
  context, sources = get_context_with_sources(results)
  response = generate_response(query, context)
  return response, sources
query = "When was GreenGrow innovations founded?"
response, sources = rag_query(collection, query)

print('\nQuery:', query)
print('\nAnswer:', response)
print('\nSources used:')
for source in sources:
  print(f" - {source}")

output:
Query: When was GreenGrow innovations founded?

Answer: GreenGrow Innovations was founded in 2010.

Sources used:
 - GreenGrow Innovations_ Company History.docx (chunk 0)
 - GreenGrow Innovations_ Company History.docx (chunk 4)

------------------------------------------------------------
query = "When was Tesla founded?"
response, sources = rag_query(collection, query)

print('\nQuery:', query)
print('\nAnswer:', response)
print('\nSources used:')
for source in sources:
  print(f" - {source}")

output:
Query: When was Tesla founded?

Answer: I cannot answer this based on the provided information.

Sources used:
 - Company_ GreenFields BioTech.docx (chunk 0)
 - GreenGrow Innovations_ Company History.docx (chunk 0)

Session & Message management

Our basic RAG pipeline is functioning quite well, but its not over yet. One significant limitation is the absence of a conversation history feature. This feature is crucial when working with RAG pipelines because it enables the llm to refer back to previous interactions. By maintaining a record of past queries and responses, the model can provide more contextually relevant answers and improve the overall output.

import uuid
from datetime import datetime
import json

# In-memory storage for conversations keyed by session ID
conversations = {}

def create_session():
  """Create a new session"""
  # Generate a unique session ID
  session_id = str(uuid.uuid4())

  # Initialize an empty message list for the session
  conversations[session_id] = []
  return session_id

def add_message(session_id: str, role: str, content: str):
  """Add a message to conversation history"""
  # Ensure session exists in the conversation store
  if session_id not in conversations:
    conversations[session_id] = []

  # Append the message with role, content, and timestamp
  conversations[session_id].append({
      "role": role,
      "content": content,
      "timestamp": datetime.now().isoformat()
  })

def get_conversation_history(session_id: str, max_messages: int = None):
  """Get conversation history for a session"""
  # Return empty if session does not exist
  if session_id not in conversations:
    return []

  # Fetch conversation history
  history = conversations[session_id]

  # Optionally limit the number of messages returned
  if max_messages:
    history = history[-max_messages:]

  return history

# Creating a formatted conversation history string, assigning "Human" or "Assistant" roles

def format_conversation_history(session_id: str, max_messages: int = 5):
  """Format conversation history for adding in prompts"""
  # Retrieve most recent messages from session
  history = get_conversation_history(session_id, max_messages)

  # Format each message with appropriate role label
  formatted_history = ""
  for msg in history:
    role = "Human" if msg["role"] == "user" else "Assistant"
    formatted_history += f"{role}: {msg['content']}\n\n"

  return formatted_history.strip()

def contextualize_query(query: str, conversation_history: str):
    """
    Reformulate follow-up questions into standalone queries using Gemini.
    """
    # Prompt instructs model to rewrite the question based on prior chat history
    prompt = """Given a chat history and the latest user question
which might reference context in the chat history, formulate a standalone
question which can be understood without the chat history.
Do NOT answer the question, just reformulate it if needed and otherwise return it as is.

Chat history:
{history}

Question:
{question}

Rewritten standalone question:""".format(
        history=conversation_history.strip(),
        question=query.strip()
    )

    # Initialize Gemini model
    model = genai.GenerativeModel("gemini-2.5-flash")

    try:
        # Generate the reformulated standalone question
        response = model.generate_content(
            prompt,
            generation_config={
                "temperature": 0.0,
                "max_output_tokens": 100,
            }
        )
        return response.text.strip()
    except Exception as e:
        # Fallback to original query on failure
        print(f"Error contextualizing query: {str(e)}")
        return query

Integrating the conversation history in our RAG

RAG combined with conversation history enhances coherence.

def get_prompt(context, conversation_history, query):
  prompt = f"""Based on the following context and conversation history, please provide
           a relevant and contexual respone. If the answer cannot be deriveed from the
           content, only use the conversation history or say 'I cannot answer this
           based on the provided context.'
           context from documents: {context}
           previous conversation: {conversation_history}
           Human: {query}
           Assistant: """
  return prompt

def generate_response(query: str, context: str, conversation_history: str = ""):
    """Generate a response using Gemini with context and conversation history"""
    prompt = get_prompt(context, conversation_history, query)
    model = genai.GenerativeModel("gemini-2.5-flash")
    try:
        response = model.generate_content(
            prompt,
            generation_config={
                "temperature": 0.0,
                "max_output_tokens": 500,
            }
        )
        return response.text
    except Exception as e:
        return f"Error generating response: {str(e)}"

Creating our FINAL RAG

At this point, everything is modular embedding, storage, retrieval, generation, and session handling. You can integrate this with a Flask or FastAPI backend or connect it to a UI as well. However im not doing that here.

def conversational_rag(collection, query: str, session_id: str, n_chunks: int = 2):
  """Perform RAG query with session and conversational history"""
  conversation_history = format_conversation_history(session_id)
  query = contextualize_query(query, conversation_history)
  print("Contexualised query:", query)
  context, sources = get_context_with_sources(semantic_search(collection, query, n_chunks))
  print("Context:", context)
  print("Sources:", sources)
  response = generate_response(query, context, conversation_history)
  add_message(session_id, "user", query)
  add_message(session_id, "assistant", response)
  return response, sources
session_id = create_session()
query = "When was GreenGrow innovations founded?"
response, sources = conversational_rag(collection, query, session_id)

output:
Contexualised query: When was GreenGrow innovations founded?
Context: GreenGrow Innovations was founded in 2010 by Sarah Chen and Michael Rodriguez...
Sources: ['GreenGrow Innovations_ Company History.docx (chunk 0)', 'GreenGrow Innovations_ Company History.docx (chunk 4)']
GreenGrow Innovations was founded in 2010.

Conclusion

Building a custom RAG pipeline from scratch wasn't just a technical task, it was a deliberate choice to focus on clarity, control, and customizability over convenience. While LangChain and similar frameworks allow for quick prototyping, they often have hidden complexities and tightly connected parts that make debugging, optimizing, or experimenting difficult.

By choosing a basic approach, I gained:

  • Complete visibility into chunking, embedding, retrieval, and generation.

  • Easier debugging and better control over unusual cases.

  • Modular components that can be swapped (e.g., using Gemini today and switching to LLaMA or Claude tomorrow).

This method may not scale as quickly as an orchestrated agent framework, but for focused, explainable RAG applications especially those used in research, education, or clinical settings. simplicity often wins.

Code & Project

You can explore the full code, including modular functions and a simple API interface, on my GitHub:
🔗 VanillaRAG

1
Subscribe to my newsletter

Read articles from Rishee Panchal directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Rishee Panchal
Rishee Panchal

I’m a computer science student exploring AI and machine learning through hands-on projects and critical thinking. My work spans from experimenting with LLMs and transformers to building practical AI tools. I believe in learning by doing, questioning assumptions, and sharing insights along the way.