Parallel Query Retrieval (Advanced RAG technique)

Parallel query retrieval, also know as fan out retrieval, is a method of retrieving relevant chunks by increasing number of queries given by user and take data from it.

In this technique, we take input prompt from user and give that input prompt to a LLM model to generate similar queries like the user query. Once the queries are generated(let us assume 3 queries), we pass those 3 queries distinctly to Vector Database and find relevant embeddings for all three queries.

Out of the embeddings we get, we take all unique embeddings into output generation. We then pass those embeddings to LLM for mixed output generation.

from pathlib import Path
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings  
from langchain_qdrant import QdrantVectorStore
from langchain.prompts import PromptTemplate
from langchain_community.llms import HuggingFaceHub
from dotenv import load_dotenv
import os
from concurrent.futures import ThreadPoolExecutor

load_dotenv()

pdf_path = Path(__file__).parent / "nodejs.pdf"

loader = PyPDFLoader(file_path=pdf_path)
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
)

split_docs = text_splitter.split_documents(documents=docs)

embedder = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

vector_store = QdrantVectorStore.from_documents(
    documents=[],
    url="http://localhost:6333",
    collection_name="learning_langchain",
    embedding=embedder
)
vector_store.add_documents(documents=split_docs)
print("Injection Done")

retriever = QdrantVectorStore.from_existing_collection(
    url="http://localhost:6333",
    collection_name="learning_langchain",
    embedding=embedder
)

def generate_variations(prompt: str) -> list:
    from transformers import pipeline
    generator = pipeline("text2text-generation", model="google/flan-t5-base")
    prompt_template = f"Generate 3 semantically similar versions of this query:\n'{prompt}'"
    result = generator(prompt_template, max_length=64, num_return_sequences=1)[0]["generated_text"]
    variations = [line.strip("- ") for line in result.strip().split("\n") if line.strip()]
    return variations[:3]

def deduplicate_chunks(chunks):
    seen = set()
    unique = []
    for doc in chunks:
        if doc.page_content not in seen:
            seen.add(doc.page_content)
            unique.append(doc)
    return unique

def parallel_similarity_search(queries):
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(retriever.similarity_search, queries))
    flat_results = [doc for sublist in results for doc in sublist]
    return deduplicate_chunks(flat_results)

def retrieve_relevant_chunks(user_prompt: str):
    similar_prompts = generate_variations(user_prompt)
    all_prompts = [user_prompt] + similar_prompts
    results = parallel_similarity_search(all_prompts)
    print("Final Relevant Chunks:")
    for doc in results:
        print(doc.page_content.strip())

retrieve_relevant_chunks("who is president of USA in 2000?")

One Question:

If we get a chunk present in all files, and subsequent chunks present in less files, then should we give more priority to the information present in chunk1 or not?

Give your opinions in comments😊!!

1
Subscribe to my newsletter

Read articles from Shrey C paunwala directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Shrey C paunwala
Shrey C paunwala