Parallel Query Retrieval in RAG: In-Depth Guide with Code Examples

Ganesh GhadageGanesh Ghadage
6 min read

In the last Query Translation in RAG: 5 Powerful Techniques to Improve Retrieval Accuracy article we briefly discussed about the Parallel Query Retrieval technique. In this article we will go in depth of parallel query retrieval. We will write a code for it.

In the Query Translation article we understood that we can not trust the user query. User might ask some ambigues query. If the query is ambiguous generated output is also ambiguous.

What is Parallel Query (Fan-Out)?

Parallel Query Retrieval also known as Multi Query is an technique where we create multiple queries based on the users query. Each query will cover the different aspect of the user requirements that user may even know that he wanted.

Example:

If user wants know about fs module in node.js and he asks a question:

what is fs module?

but suppose our data set has some chunks that says file system module and that is very important to answer user query, that won’t be get retrieved as fs and file system has different vector embeddings and won’t get searched during similarity search. In this case the generated answer won’t be an accurate.

So what we do it to ask LLM model to generate 4-5 different queries based on user query, like below

  1. What is the File System (FS) module?

  2. What is the purpose of the FS module?

  3. Describe the Node.js FS module.

  4. How does the FS module work?

  5. Explain the concept of the FS module.

generated queries will contain more fancy words, and will fetch more chunks during similarity search

How Parallel Query Retrieval works

Step 1: Generating multiple queries

When user asks as a query, we provide it to an small LLM ask it to generate multiple queries of it.

import os
from dotenv import load_dotenv

from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import BaseOutputParser
from typing import List, Dict

load_dotenv()

class LineListOutputParser(BaseOutputParser[List[str]]):
  """Output parser for a list of lines."""

  def parse(self, text: str) -> List[str]:
    lines = text.strip().split("\n")
    return list(filter(None, lines)) 

output_parser = LineListOutputParser()

llm = ChatGoogleGenerativeAI(
  model="gemini-2.5-flash",
  google_api_key=os.getenv("GEMINI_API_KEY"),
  temperature=0,
  max_tokens=None,
  timeout=None,
  max_retries=2,
)

QUERY_REWRITE_PROMPT = PromptTemplate(
  input_variables=["question"],
  template="""You are an AI language model assistant. Your task is to generate five 
  different versions of the given user question to retrieve relevant documents from a vector 
  database. By generating multiple perspectives on the user question, your goal is to help
  the user overcome some of the limitations of the distance-based similarity search. 
  Provide these alternative questions separated by newlines.
  Original question: {question}""",
)

llm_chain = QUERY_REWRITE_PROMPT | llm | output_parser

# ----- Step 1: Generate Multiple Queries -----
user_query = "What is FS Module?"
generated_queries = llm_chain.invoke(user_query)

print("--------- Generated Queries -----------")
for i, query in enumerate(generated_queries, 1):
  print(f"{i}. {query}")
print("------------------------")

Step 2: Parallely fetching relevant chunks from vector store

Once we generate multiple queries, we embed those query, and retrieve the relevant chunks from the vector store by performing similarity search. This action is done for all the generated query in parallel.

import os
from dotenv import load_dotenv
import concurrent.futures

from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
from langchain_google_genai import GoogleGenerativeAIEmbeddings

load_dotenv()

embeddings = GoogleGenerativeAIEmbeddings(
  model="models/gemini-embedding-001",
  google_api_key=os.getenv("GEMINI_API_KEY")
)

COLLECTION_NAME = "nodejs_document"

def get_qdrant_client():
  client = QdrantClient(url="http://localhost:6333")

  if COLLECTION_NAME not in [c.name for c in client.get_collections().collections]:
    client.create_collection(
      collection_name=COLLECTION_NAME,
      vectors_config=VectorParams(size=3072, distance=Distance.COSINE),
    )

  return client

def get_vector_store():
    client = get_qdrant_client()
    return QdrantVectorStore(
        client=client,
        collection_name=COLLECTION_NAME,
        embedding=embeddings,
    )

qdrant = get_vector_store()

# ----- Fetch Documents in Parallel -----
def fetch_docs(query):
  docs = qdrant.as_retriever().invoke(query)
  return (query, docs)

query_to_docs: Dict[str, List] = {}

# Use ThreadPoolExecutor to run in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
  results = list(executor.map(fetch_docs, generated_queries))

# Map results
for query, docs in results:
  query_to_docs[query] = docs

for query, docs in query_to_docs.items():
  print(f"\n===== Results for Query: \"{query}\" =====")
  for i, doc in enumerate(docs, 1):
    print(f"Doc {i} ID: {doc.metadata["_id"]}")
  print("===============================")

Step 3: Filter the unique documents

From all the total documents fetched by all queries, we filter the unique documents from them, to avoid the deduplication.

# ----- Deduplicate Documents -----
# Strategy: use doc.metadata['id'] if available, else page_content hash

def get_doc_id(doc):
  return doc.metadata.get("id") or hash(doc.page_content.strip())

seen = {}
for docs in query_to_docs.values():
  for doc in docs:
    doc_id = get_doc_id(doc)
    if doc_id not in seen:
      seen[doc_id] = doc

unique_docs = list(seen.values())

print(f"\nTotal Unique documents: {len(unique_docs)}")
print("===============================")
for i, doc in enumerate(unique_docs, 1):
  print(f"Doc {i} ID: {doc.metadata["_id"]}")
print("===============================")

Step 4: Generating output

Using retrieved unique documents and users original query, we generate the output, which will more accurate.

import os
from openai import OpenAI

from retriever.retrival import parallel_query_retriver

api_key = os.getenv("GEMINI_API_KEY")

client = OpenAI(
  api_key=api_key,
  base_url="https://generativelanguage.googleapis.com/v1beta/openai/"
)

def llm_chat(query: str):
  unique_docs = parallel_query_retriver(query)

  context = "\n\n"
  for doc in unique_docs:
    # print(doc)
    context += f"Page Content: {doc.page_content}\nPage Number: {doc.metadata['page_label']}\nFile Location: {doc.metadata['source']} \n\n"


  SYSTEM_PROMPT = f"""
    You are a helpful AI assistant who answers user query based on the available context retrieved from a PDF file along with page_contents and page number.

    You should only answer the user based on the following context and navigate the user to open the right page number to know more.
    answer should be in details.
    Context:
    {context}
  """

  chat_completion = client.chat.completions.create(
    model="gemini-2.0-flash",
    messages=[
      {"role": "system", "content": SYSTEM_PROMPT},
      {"role": "user", "content": query}
    ],
  )

  return chat_completion.choices[0].message.content

Benefits of Parallel Query

Even though Parallel Query adds a overhead and extra LLM call, it’s improved performance and accuracy negats the extra cost. Parallel Query retrieval system can handle ambiguous user query, any typos or any mistakes in user query will not affect the final output.

Source Code

retrival.py

import os
from dotenv import load_dotenv

from langchain_qdrant import QdrantVectorStore
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import PromptTemplate
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_core.output_parsers import BaseOutputParser
from typing import List, Dict, Set
import concurrent.futures
from langchain_core.documents import Document

from utils.output_parser import output_parser
from config.vector_store import get_vector_store
from llm.prompt_templates import QUERY_REWRITE_PROMPT

# ----- setup --------

qdrant = get_vector_store()

llm = ChatGoogleGenerativeAI(
  model="gemini-2.5-flash",
  google_api_key=os.getenv("GEMINI_API_KEY"),
  temperature=0,
  max_tokens=None,
  timeout=None,
  max_retries=2,
)

llm_chain = QUERY_REWRITE_PROMPT | llm | output_parser

def parallel_query_retriver(
  user_query: str, 
  llm_chain=llm_chain, 
  retriever=qdrant.as_retriever()
) -> List[Document]:
  """
  Generate multiple queries using llm_chain, fetch documents using retriever,
  and return deduplicated list of documents.

  Args:
    user_query (str): The input user query.
    llm_chain: A Runnable (PromptTemplate | LLM | OutputParser) for generating queries.
    retriever: A LangChain retriever (e.g., qdrant.as_retriever()).

  Returns:
    List[Document]: Deduplicated retrieved documents across all queries.
  """
  # Step 1: Generate multiple queries
  generated_queries = llm_chain.invoke(user_query)

  # Step 2: Fetch documents in parallel
  def fetch_docs(query: str):
    return retriever.invoke(query)

  all_docs: List[Document] = []
  with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(fetch_docs, generated_queries)
    for docs in results:
      all_docs.extend(docs)

  # Step 3: Deduplicate docs
  seen: Set[str] = set()
  unique_docs: List[Document] = []

  for doc in all_docs:
    doc_id = doc.metadata.get("id") or hash(doc.page_content.strip())
    if doc_id not in seen:
      seen.add(doc_id)
      unique_docs.append(doc)

  return unique_docs

chat.py

import os
from openai import OpenAI

from retriever.retrival import parallel_query_retriver

api_key = os.getenv("GEMINI_API_KEY")

client = OpenAI(
  api_key=api_key,
  base_url="https://generativelanguage.googleapis.com/v1beta/openai/"
)

def llm_chat(query: str):
  search_results =  parallel_query_retriver(query)

  context = "\n\n"
  for doc in search_results:
    # print(doc)
    context += f"Page Content: {doc.page_content}\nPage Number: {doc.metadata['page_label']}\nFile Location: {doc.metadata['source']} \n\n"


  SYSTEM_PROMPT = f"""
    You are a helpful AI assistant who answers user query based on the available context retrieved from a PDF file along with page_contents and page number.

    You should only answer the user based on the following context and navigate the user to open the right page number to know more.
    answer should be in details.
    Context:
    {context}
  """

  chat_completion = client.chat.completions.create(
    model="gemini-2.0-flash",
    messages=[
      {"role": "system", "content": SYSTEM_PROMPT},
      {"role": "user", "content": query}
    ],
  )

  return chat_completion.choices[0].message.content

main.py

import os
from dotenv import load_dotenv

from generator.chat import llm_chat

load_dotenv()

def main():
  query = "what is fs module?"
  result = llm_chat(query=query)

  print(result)

if __name__ == "__main__":
  main()

Full source code available at: GitHub

0
Subscribe to my newsletter

Read articles from Ganesh Ghadage directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Ganesh Ghadage
Ganesh Ghadage