Parallel Fan-Out Retrieval System using LangChain and Vector Databases

gautam kumargautam kumar
5 min read

Introduction

Parallel Query Retrieval (Fan-out) is a technique where a query is sent out to multiple smaller parts (or groups) of data at the same time, rather than one by one. This approach optimizes performance by reducing wait times and enhancing efficiency, as responses are processed as soon as they are received, rather than sequentially.

It’s like asking different people to look for the same information in different sections of a library and getting their answers all at once, instead of waiting for each person to check one section at a time. This makes retrieving information faster and more efficient.

"Parallel Query Retrieval turns searching from a slow walk into a sprint — dividing the work to multiply the speed."
— hitesh choudhary

Architecture

Parallel Query Retrieval or Fan-out make things quicker and more efficient. It's used in apps and websites to fetch data from different places simultaneously, saving time and improving performance.

Now, let's break it down into simple terms and understand this by the architectural diagram

  • "Parallel" means doing many things at the same time.

  • "Query Retrieval" means asking for information (querying) and getting it (retrieval).

  • "Fan-out" is just a fancy way of saying sending out a bunch of requests all at once (like how a fan spreads out air).

Why do we use Parallel Query Retrieval (Fan-out)?

Using Parallel Query Retrieval (Fan-out) has some big advantages:

  • It's much faster than asking one-by-one.

  • It's more efficient, especially when you have many places to get data from (like different databases, services, or servers).

Example:

Let's say you're shopping online and want to compare prices for a product across different websites.

Without Parallel Query Retrieval

With Parallel Query Retrieval

Parallel Query Retrieval using python


import asyncio
import aiohttp

async def fetch(session, url):
async with session.get(url) as response:
return await response.json()

async def parallel_fetch():
        urls = [
        'https://flipkart.com',
        'https://amazon.com',
        'https://myntra.com'
        ]

async with aiohttp.ClientSession() as session:
    tasks = [fetch(session, url) for url in urls]
    results = await asyncio.gather(*tasks)
    print('Results:', results)

asyncio.run(parallel_fetch())

Parallel Query Retrieval in RAG apps with Langchain

In RAG systems, before generating a response, the system first retrieves documents relevant to your question.

Parallel Query Retrieval (Fan-out) means:

When trying to retrieve, the RAG system doesn’t search only one place (or one database). It "fans out" and queries multiple sources simultaneously!

Code

Before you run your program, make sure to install all the below dependency and create the virtual environment. I am using “uv” to create the virtual environment, for more details checkout the video

https://www.youtube.com/watch?v=8mk85fyzevc

To setup Quadrant DB, use the local setup guide, follow the below resources

https://qdrant.tech/documentation/quickstart/

https://www.youtube.com/watch?v=mHrwS6ZoNKc

Algorithm:

Step 1 => Load the PDF file from disk.
Step 2 => Split the loaded PDF into smaller chunks (for example, 500 characters each).
Step 3 => Divide these chunks into equal groups (say 3 groups) — each group will behave like a separate "mini database."
Step 4 => For each group: Create a Vectorstore using an embedding model (OpenAI Embeddings).
Step 5 => When a user gives a query: Send the query in parallel (fan-out) to all groups (vectorstores). Step 6 => Retrieve top-k similar documents from each group.
Step 7 => Merge the retrieved documents into a final result list.
Step 8 => Display the final results.

Langchain implementation

# IMPORT REQUIRED LIBRARIES
import asyncio
import os
from dotenv import load_dotenv
from typing import List
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Quadrant
from langchain.schema import Document
from langchain.llms import OpenAI

# LOAD ENV VARIABLES
load_dotenv()

# SET API KEYS
openai_api_key = os.getenv("OPENAI_API_KEY")

# INITIALIZE EMBEDDING MODEL
embedding_model = OpenAIEmbeddings(openai_api_key=os.environ["OPENAI_API_KEY"])

# INITIALIZE LLM (FOR HYPOTHETICAL DOCUMENT GENERATION)
llm = OpenAI(
    temperature=0,
    openai_api_key=os.environ["OPENAI_API_KEY"]
)

# -------------------------------
# STEP 1: LOAD A SINGLE PDF
# -------------------------------

def load_documents(pdf_path: str) -> List[Document]:
    loader = PyPDFLoader(pdf_path)
    return loader.load()

# PDF PATH
pdf_path = "ecommerce_products.pdf"

# LOAD THE DOCUMENTS
documents = load_documents(pdf_path)
print(f"Loaded {len(documents)} documents from the PDF.")

# -------------------------------
# STEP 2: SPLIT INTO CHUNKS
# -------------------------------

splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = splitter.split_documents(documents)
print(f"Split into {len(chunks)} chunks.")

# -------------------------------
# STEP 3: DIVIDE CHUNKS INTO GROUPS
# -------------------------------

# SPLIT CHUNKS INTO 3 GROUPS
group_size = len(chunks) // 3
chunks_group_1 = chunks[:group_size]
chunks_group_2 = chunks[group_size:2*group_size]
chunks_group_3 = chunks[2*group_size:]

print(f"Chunks Group Sizes: {len(chunks_group_1)}, {len(chunks_group_2)}, {len(chunks_group_3)}")

# -------------------------------
# STEP 4: CREATE VECTOR DATABASES FOR EACH GROUP
# -------------------------------

# IN MEMORY VECTOR STORES
db_group_1 = Quadrant.from_documents(chunks_group_1, embedding=embedding_model)
db_group_2 = Quadrant.from_documents(chunks_group_2, embedding=embedding_model)
db_group_3 = Quadrant.from_documents(chunks_group_3, embedding=embedding_model)

print("Vectorstores created for all groups.")

# -------------------------------
# STEP 5: DEFINE ASYNC RETRIEVAL FUNCTIONS
# -------------------------------

async def async_retrieve_from_db(db, query):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, db.similarity_search, query, 3)

async def async_parallel_query_retrieval(query):
    tasks = [
        async_retrieve_from_db(db_group_1, query),
        async_retrieve_from_db(db_group_2, query),
        async_retrieve_from_db(db_group_3, query),
    ]
    results_nested = await asyncio.gather(*tasks)
    results = [doc for sublist in results_nested for doc in sublist]
    return results

# -------------------------------
# STEP 6: MAIN EXECUTION
# -------------------------------

query = "Best mobile phone under 20,000"

# RUN THE PARALLEL QUERY RETRIEVAL
results = asyncio.run(async_parallel_query_retrieval(query))

# PRINT THE RESULTS
print("\n=== Retrieved Results ===\n")
for i, doc in enumerate(results, 1):
    print(f"Result {i}:\n{doc.page_content[:300]}...\n---")

Output

Full working code on Github:

https://github.com/gautamkmahato

Conclusion

By using Parallel Query Retrieval (Fan-out) in a RAG (Retrieval-Augmented Generation) system, we can dramatically speed up how fast we fetch relevant information. Instead of searching one big database slowly, we divide the workload and retrieve from smaller groups at once, making the entire RAG pipeline faster, smarter, and more scalable. #chaicode

0
Subscribe to my newsletter

Read articles from gautam kumar directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

gautam kumar
gautam kumar