Accelerating Document Embedding Generation with Ray, FastEmbed, and Qdrant

yash bhaskaryash bhaskar
6 min read

For medium and large businesses, extracting meaningful insights from large volumes of unstructured data, such as text documents, is crucial. However, the traditional approach of sequentially processing documents for embedding generation can take time, especially when dealing with thousands of documents.

In this article, we explore an efficient solution using Ray for parallelization, FastEmbed for faster embedding generation, and Qdrant for efficient storage and retrieval of embeddings.

Code Source: Github Link

Throughout this article, we will be experimenting with 5 PDFs which are:

  1. Apple_Environmental_Progress_Report_2022.pdf : https://www.apple.com/environment/pdf/Apple_Environmental_Progress_Report_2022.pdf

  2. GPT4_TechnicalReport.pdf : https://cdn.openai.com/papers/gpt-4.pdf

  3. Model_Card_Claude_3.pdf : https://www-cdn.anthropic.com/de8ba9b01c9ab7cbabf5c33b80b7bbc618857627/Model_Card_Claude_3.pdf

  4. Palm2techreport.pdf : https://ai.google/static/documents/palm2techreport.pdf

  5. Mistral7b.pdf : https://arxiv.org/pdf/2310.06825.pdf

All the text from the above 5 pdfs amounts to 200K Tokens.

Why Is Sequential Embedding Generation Time-Consuming?

Sequentially processing documents for embedding generation can be slow, particularly when dealing with many documents. Each document needs to be processed individually, resulting in increased processing time, especially when using complex embedding models.

Time taken to generate embeddings without both fastEmbed and Ray Distributed Computing : 1463.395 seconds

Using FastEmbed for Faster Embedding Generation

FastEmbed is a library designed for fast and efficient text embedding generation. By employing FastEmbed, we can optimize the embedding generation process, leading to faster execution times compared to traditional embedding methods. This acceleration is crucial when dealing with large-scale document processing tasks.

Time taken to generate embeddings with fastEmbed but without Ray Distributed Computing : 674.568 seconds

Using Ray for Parallelization

Ray is a powerful framework for building distributed applications. By utilizing Ray’s parallel processing capabilities, we can distribute the task of embedding generation across multiple workers, significantly reducing the overall processing time. In our implementation, we leverage Ray to parallelize the embedding generation process for thousands of documents.

Time taken to generate embeddings with both FastEmbed and Ray Distributed Computing: 195.922 seconds

Step-by-Step Guide to Using Ray, FastEmbed, and Qdrant

  • Installing Dependencies:
pip install qdrant-client[fastembed]
pip install fastembed
pip install PyMuPDF
pip install -U "ray[data,train,tune,serve]"
  • Setting Up Qdrant: We start by setting up Qdrant, a high-performance vector database, to store and retrieve embeddings efficiently.
sudo docker pull qdrant/qdrant
sudo docker run -p 6333:6333 -p 6334:6334 \
 -v $(pwd)/qdrant_storage:/qdrant/storage:z \
 qdrant/qdrant
  • Document Preprocessing: We preprocess the documents by extracting and tokenizing text from PDF files into sentences.
import os
from PyPDF2 import PdfReader
import numpy as np

def extract_text_from_pdf(pdf_path):
    reader = PdfReader(pdf_path)
    extracted_text = ""
    for page in reader.pages:
        extracted_text += page.extract_text()
    return extracted_text

def extract_text_from_pdfs_in_directory(directory):
    for filename in os.listdir(directory):
        if filename.endswith(".pdf"):
            pdf_path = os.path.join(directory, filename)
            extracted_text = extract_text_from_pdf(pdf_path)
            txt_filename = os.path.splitext(filename)[0] + ".txt"
            txt_filepath = os.path.join(directory, txt_filename)
            with open(txt_filepath, "w") as txt_file:
                txt_file.write(extracted_text)

# Specify the directory containing PDF files
directory_path = "Docs/"

# Extract text from PDFs in the directory and save as text files
extract_text_from_pdfs_in_directory(directory_path)
import os
from nltk.tokenize import sent_tokenize

directory_path = "Docs"

# List all .txt files in the directory
txt_files = [file for file in os.listdir(directory_path) if file.endswith('.txt')]

# List to store sentences from all files
all_sentences = []

# Read each text file, split into sentences, and store
for txt_file in txt_files:
    file_path = os.path.join(directory_path, txt_file)
    with open(file_path, "r") as file:
        text = file.read()
        sentences = sent_tokenize(text)
        all_sentences.extend(sentences)

# Print the first few sentences as an example
print(all_sentences[:10])  # Print first 10 sentences
  • Parallel Embedding Generation with Ray: Using Ray, we parallelize the embedding generation process across multiple workers, each responsible for processing a chunk of documents.
import ray
from fastembed import TextEmbedding
from typing import List
import numpy as np
import time

TextEmbedding(model_name="BAAI/bge-base-en", cache_dir="./embeddings")

ray.init(ignore_reinit_error=True)

@ray.remote
class EmbeddingWorker:
    def __init__(self):
        self.embedding_model = TextEmbedding(model_name="BAAI/bge-base-en", cache_dir="./embeddings")

    def embed_documents(self, documents):
        embeddings = []
        for document in documents:
            embeddings.append(np.array(list(self.embedding_model.embed([document]))))
        return embeddings

# Define the number of workers
num_workers = 4  # Adjust this according to your resources
documents = all_sentences

# Split documents into chunks for each worker
chunk_size = len(documents) // num_workers
document_chunks = [documents[i:i+chunk_size] for i in range(0, len(documents), chunk_size)]

# Start the workers
embedding_workers = [EmbeddingWorker.remote() for _ in range(num_workers)]

# Perform embedding generation in parallel
start_time = time.time()
embedding_tasks = [worker.embed_documents.remote(chunk) for worker, chunk in zip(embedding_workers, document_chunks)]
embeddings = ray.get(embedding_tasks)
end_time = time.time()

# Flatten the embeddings list
embeddings = [embedding for sublist in embeddings for embedding in sublist]

print("Time taken to generate embeddings with Ray Distributed Computing:", end_time - start_time, "seconds")

# Shutdown Ray
ray.shutdown()
embeddings = [sublist[0] for sublist in embeddings]Storing Embeddings in Qdrant: Once the embeddings are generated, we store them in Qdrant for fast and efficient retrieval.
  • Storing Embeddings in Qdrant: Once the embeddings are generated, we store them in Qdrant for fast and efficient retrieval.
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from qdrant_client.models import PointStruct

# client = QdrantClient(path="./DB")
client = QdrantClient("localhost", port=6333)
collection_name = 'fastembed_collection'
client.recreate_collection(
    collection_name=collection_name,
    vectors_config=VectorParams(size=768, distance=Distance.COSINE),
)

client.upload_points(
    collection_name=collection_name,
    points=[
        PointStruct(
            id=idx,
            vector=vector.tolist(),
            payload={"color": "red", "rand_number": idx % 10}
        )
        for idx, vector in enumerate(embeddings)
    ]
)
  • Retrieving top k chunks for n queries: Once the Qdrant DB is updated, we can parallelly retrieve.
import time
from fastembed import TextEmbedding
import ray 
import numpy as np

ray.init(ignore_reinit_error=True)

@ray.remote
class EmbeddingWorker:
    def __init__(self):
        self.embedding_model = TextEmbedding(model_name="BAAI/bge-base-en", cache_dir="./embeddings")

    def embed_documents(self, documents):
        embeddings = []
        for document in documents:
            embeddings.append(np.array(list(self.embedding_model.embed([document]))))
        return embeddings

# Define the number of workers
num_workers = 2  # Adjust this according to your resources
documents = ["Can AI Models be hacked?","How to secure AI models?"]

# Split documents into chunks for each worker
chunk_size = len(documents) // num_workers
document_chunks = [documents[i:i+chunk_size] for i in range(0, len(documents), chunk_size)]

# Start the workers
embedding_workers = [EmbeddingWorker.remote() for _ in range(num_workers)]

# Perform embedding generation in parallel
start_time = time.time()
embedding_tasks = [worker.embed_documents.remote(chunk) for worker, chunk in zip(embedding_workers, document_chunks)]
embeddings = ray.get(embedding_tasks)
end_time = time.time()

# Flatten the embeddings list
embeddings = [embedding for sublist in embeddings for embedding in sublist]

print("Time taken to generate embeddings with Ray Distributed Computing:", end_time - start_time, "seconds")

# Shutdown Ray
ray.shutdown()
query_embeddings = [sublist[0] for sublist in embeddings]

from typing import List
from qdrant_client import QdrantClient

client = QdrantClient("localhost", port=6333)

collection_name = 'fastembed_collection'
for query_embedding in query_embeddings:
    query_vector: List[np.ndarray] = list(query_embedding)
    hits = client.search(
        collection_name=collection_name,
        query_vector=query_vector,
        limit=8 
    )
    print(hits)

    for i in range(8):
        print(all_sentences[hits[i].id])
        print("-------------")
    print("=====================================")

Results: Sequential vs. Parallel Embedding Generation

We compare the performance of sequential and parallel embedding generation approaches. Through empirical analysis, we demonstrate the significant reduction in processing time achieved by parallelizing the task using Ray and optimizing embedding generation with FastEmbed.

Time taken to generate embeddings without both fastEmbed and Ray Distributed Computing : 1463.395 seconds

Time taken to generate embeddings with fastEmbed but without Ray Distributed Computing : 674.568 seconds

Time taken to generate embeddings with both FastEmbed and Ray Distributed Computing: 195.922 seconds

Conclusion

In conclusion, we have shown how the combination of Ray, FastEmbed, and Qdrant can accelerate the embedding generation process for thousands of documents. By leveraging parallel processing and efficient embedding techniques, organisations can achieve faster insights from their unstructured data, leading to improved decision-making and enhanced productivity.

Thank you for reading, I hope this article keeps you updated.

Connect with me : Linkedin | Github | Medium | Hashnode : Email

Code — https://github.com/yash9439/RayQdrantFastEmbed

10
Subscribe to my newsletter

Read articles from yash bhaskar directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

yash bhaskar
yash bhaskar