Accelerating Document Embedding Generation with Ray, FastEmbed, and Qdrant
For medium and large businesses, extracting meaningful insights from large volumes of unstructured data, such as text documents, is crucial. However, the traditional approach of sequentially processing documents for embedding generation can take time, especially when dealing with thousands of documents.
In this article, we explore an efficient solution using Ray for parallelization, FastEmbed for faster embedding generation, and Qdrant for efficient storage and retrieval of embeddings.
Code Source: Github Link
Throughout this article, we will be experimenting with 5 PDFs which are:
Apple_Environmental_Progress_Report_2022.pdf : https://www.apple.com/environment/pdf/Apple_Environmental_Progress_Report_2022.pdf
GPT4_TechnicalReport.pdf : https://cdn.openai.com/papers/gpt-4.pdf
Model_Card_Claude_3.pdf : https://www-cdn.anthropic.com/de8ba9b01c9ab7cbabf5c33b80b7bbc618857627/Model_Card_Claude_3.pdf
Palm2techreport.pdf : https://ai.google/static/documents/palm2techreport.pdf
Mistral7b.pdf : https://arxiv.org/pdf/2310.06825.pdf
All the text from the above 5 pdfs amounts to 200K Tokens.
Why Is Sequential Embedding Generation Time-Consuming?
Sequentially processing documents for embedding generation can be slow, particularly when dealing with many documents. Each document needs to be processed individually, resulting in increased processing time, especially when using complex embedding models.
Time taken to generate embeddings without both fastEmbed and Ray Distributed Computing : 1463.395 seconds
Using FastEmbed for Faster Embedding Generation
FastEmbed is a library designed for fast and efficient text embedding generation. By employing FastEmbed, we can optimize the embedding generation process, leading to faster execution times compared to traditional embedding methods. This acceleration is crucial when dealing with large-scale document processing tasks.
Time taken to generate embeddings with fastEmbed but without Ray Distributed Computing : 674.568 seconds
Using Ray for Parallelization
Ray is a powerful framework for building distributed applications. By utilizing Ray’s parallel processing capabilities, we can distribute the task of embedding generation across multiple workers, significantly reducing the overall processing time. In our implementation, we leverage Ray to parallelize the embedding generation process for thousands of documents.
Time taken to generate embeddings with both FastEmbed and Ray Distributed Computing: 195.922 seconds
Step-by-Step Guide to Using Ray, FastEmbed, and Qdrant
- Installing Dependencies:
pip install qdrant-client[fastembed]
pip install fastembed
pip install PyMuPDF
pip install -U "ray[data,train,tune,serve]"
- Setting Up Qdrant: We start by setting up Qdrant, a high-performance vector database, to store and retrieve embeddings efficiently.
sudo docker pull qdrant/qdrant
sudo docker run -p 6333:6333 -p 6334:6334 \
-v $(pwd)/qdrant_storage:/qdrant/storage:z \
qdrant/qdrant
- Document Preprocessing: We preprocess the documents by extracting and tokenizing text from PDF files into sentences.
import os
from PyPDF2 import PdfReader
import numpy as np
def extract_text_from_pdf(pdf_path):
reader = PdfReader(pdf_path)
extracted_text = ""
for page in reader.pages:
extracted_text += page.extract_text()
return extracted_text
def extract_text_from_pdfs_in_directory(directory):
for filename in os.listdir(directory):
if filename.endswith(".pdf"):
pdf_path = os.path.join(directory, filename)
extracted_text = extract_text_from_pdf(pdf_path)
txt_filename = os.path.splitext(filename)[0] + ".txt"
txt_filepath = os.path.join(directory, txt_filename)
with open(txt_filepath, "w") as txt_file:
txt_file.write(extracted_text)
# Specify the directory containing PDF files
directory_path = "Docs/"
# Extract text from PDFs in the directory and save as text files
extract_text_from_pdfs_in_directory(directory_path)
import os
from nltk.tokenize import sent_tokenize
directory_path = "Docs"
# List all .txt files in the directory
txt_files = [file for file in os.listdir(directory_path) if file.endswith('.txt')]
# List to store sentences from all files
all_sentences = []
# Read each text file, split into sentences, and store
for txt_file in txt_files:
file_path = os.path.join(directory_path, txt_file)
with open(file_path, "r") as file:
text = file.read()
sentences = sent_tokenize(text)
all_sentences.extend(sentences)
# Print the first few sentences as an example
print(all_sentences[:10]) # Print first 10 sentences
- Parallel Embedding Generation with Ray: Using Ray, we parallelize the embedding generation process across multiple workers, each responsible for processing a chunk of documents.
import ray
from fastembed import TextEmbedding
from typing import List
import numpy as np
import time
TextEmbedding(model_name="BAAI/bge-base-en", cache_dir="./embeddings")
ray.init(ignore_reinit_error=True)
@ray.remote
class EmbeddingWorker:
def __init__(self):
self.embedding_model = TextEmbedding(model_name="BAAI/bge-base-en", cache_dir="./embeddings")
def embed_documents(self, documents):
embeddings = []
for document in documents:
embeddings.append(np.array(list(self.embedding_model.embed([document]))))
return embeddings
# Define the number of workers
num_workers = 4 # Adjust this according to your resources
documents = all_sentences
# Split documents into chunks for each worker
chunk_size = len(documents) // num_workers
document_chunks = [documents[i:i+chunk_size] for i in range(0, len(documents), chunk_size)]
# Start the workers
embedding_workers = [EmbeddingWorker.remote() for _ in range(num_workers)]
# Perform embedding generation in parallel
start_time = time.time()
embedding_tasks = [worker.embed_documents.remote(chunk) for worker, chunk in zip(embedding_workers, document_chunks)]
embeddings = ray.get(embedding_tasks)
end_time = time.time()
# Flatten the embeddings list
embeddings = [embedding for sublist in embeddings for embedding in sublist]
print("Time taken to generate embeddings with Ray Distributed Computing:", end_time - start_time, "seconds")
# Shutdown Ray
ray.shutdown()
embeddings = [sublist[0] for sublist in embeddings]Storing Embeddings in Qdrant: Once the embeddings are generated, we store them in Qdrant for fast and efficient retrieval.
- Storing Embeddings in Qdrant: Once the embeddings are generated, we store them in Qdrant for fast and efficient retrieval.
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from qdrant_client.models import PointStruct
# client = QdrantClient(path="./DB")
client = QdrantClient("localhost", port=6333)
collection_name = 'fastembed_collection'
client.recreate_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=768, distance=Distance.COSINE),
)
client.upload_points(
collection_name=collection_name,
points=[
PointStruct(
id=idx,
vector=vector.tolist(),
payload={"color": "red", "rand_number": idx % 10}
)
for idx, vector in enumerate(embeddings)
]
)
- Retrieving top k chunks for n queries: Once the Qdrant DB is updated, we can parallelly retrieve.
import time
from fastembed import TextEmbedding
import ray
import numpy as np
ray.init(ignore_reinit_error=True)
@ray.remote
class EmbeddingWorker:
def __init__(self):
self.embedding_model = TextEmbedding(model_name="BAAI/bge-base-en", cache_dir="./embeddings")
def embed_documents(self, documents):
embeddings = []
for document in documents:
embeddings.append(np.array(list(self.embedding_model.embed([document]))))
return embeddings
# Define the number of workers
num_workers = 2 # Adjust this according to your resources
documents = ["Can AI Models be hacked?","How to secure AI models?"]
# Split documents into chunks for each worker
chunk_size = len(documents) // num_workers
document_chunks = [documents[i:i+chunk_size] for i in range(0, len(documents), chunk_size)]
# Start the workers
embedding_workers = [EmbeddingWorker.remote() for _ in range(num_workers)]
# Perform embedding generation in parallel
start_time = time.time()
embedding_tasks = [worker.embed_documents.remote(chunk) for worker, chunk in zip(embedding_workers, document_chunks)]
embeddings = ray.get(embedding_tasks)
end_time = time.time()
# Flatten the embeddings list
embeddings = [embedding for sublist in embeddings for embedding in sublist]
print("Time taken to generate embeddings with Ray Distributed Computing:", end_time - start_time, "seconds")
# Shutdown Ray
ray.shutdown()
query_embeddings = [sublist[0] for sublist in embeddings]
from typing import List
from qdrant_client import QdrantClient
client = QdrantClient("localhost", port=6333)
collection_name = 'fastembed_collection'
for query_embedding in query_embeddings:
query_vector: List[np.ndarray] = list(query_embedding)
hits = client.search(
collection_name=collection_name,
query_vector=query_vector,
limit=8
)
print(hits)
for i in range(8):
print(all_sentences[hits[i].id])
print("-------------")
print("=====================================")
Results: Sequential vs. Parallel Embedding Generation
We compare the performance of sequential and parallel embedding generation approaches. Through empirical analysis, we demonstrate the significant reduction in processing time achieved by parallelizing the task using Ray and optimizing embedding generation with FastEmbed.
Time taken to generate embeddings without both fastEmbed and Ray Distributed Computing : 1463.395 seconds
Time taken to generate embeddings with fastEmbed but without Ray Distributed Computing : 674.568 seconds
Time taken to generate embeddings with both FastEmbed and Ray Distributed Computing: 195.922 seconds
Conclusion
In conclusion, we have shown how the combination of Ray, FastEmbed, and Qdrant can accelerate the embedding generation process for thousands of documents. By leveraging parallel processing and efficient embedding techniques, organisations can achieve faster insights from their unstructured data, leading to improved decision-making and enhanced productivity.
Thank you for reading, I hope this article keeps you updated.
Connect with me : Linkedin | Github | Medium | Hashnode : Email
Subscribe to my newsletter
Read articles from yash bhaskar directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by