Knowledge Graphs and Vector Embeddings: A Powerful Combination π§ π

Table of contents
- Why Combine Knowledge Graphs and Vector Embeddings? π€
- Architecture for a Combined System ποΈ
- 1. Text Processing and Chunking π
- 2. Creating Vector Embeddings π’
- 3. Entity and Relationship Extraction for Knowledge Graph π
- 4. Building and Storing the Knowledge Graph ποΈ
- 5. Linking Vector Embeddings to Knowledge Graphs π
- 6. Hybrid Retrieval System: Combining KG and Vector Search π
- 7. Combining with an LLM for RAG π€
- 8. Practical Example: Building a KG-Enhanced RAG System π
- 9. Implementing Advanced Knowledge Graph Querying π
- 10. Implementing Knowledge Graph Reasoning π§
- 11. Advanced RAG with Multi-Hop Knowledge Paths π£οΈ
- 12. Building a Complete RAG System with KG Reasoning π§©
- Practical Considerations for Building Your Own System π οΈ
- Why This Approach is Powerful: A Visual Example π
- Real-World Use Cases π
- Conclusion: Best Practices for Implementation π―

Combining knowledge graphs with vector embeddings creates an extremely powerful information retrieval and reasoning system. Let's explore why this combination is so effective, how to implement it, and how to leverage both technologies. I have provided sample code; you will need to make sure your system has all the required libraries in the proper versions.
Why Combine Knowledge Graphs and Vector Embeddings? π€
Knowledge graphs and vector embeddings complement each other perfectly:
1. Different Strengths and Limitations πͺ
Knowledge Graphs:
β Explicit relationships between entities
β Support logical reasoning and inference
β Structured information with clear semantics
β Struggle with fuzzy matching and semantic similarity
β Limited to explicitly defined relationships
Vector Embeddings:
β Excellent at semantic similarity
β Can find related concepts even without explicit links
β Works well with unstructured text
β Lack explicit relationships
β "Black box" without clear reasoning
2. Hybrid Approach Benefits π
By combining them, you get:
More accurate answers through multiple validation paths
Better contextual understanding
Ability to retrieve both explicitly and implicitly related information
Enhanced reasoning capabilities
Improved handling of complex queries
Architecture for a Combined System ποΈ
Here's how to build a system that leverages both knowledge graphs and vector embeddings:
βββββββββββββββββββ βββββββββββββββββββββ βββββββββββββββββββββ
β β β β β β
β Source Data ββββββΆβ Text Processing ββββββΆβ Entity/Relation β
β (Documents) β β & Chunking β β Extraction β
β β β β β β
βββββββββββββββββββ βββββββββββββββββββββ βββββββββββ¬ββββββββββ
β
βΌ
βββββββββββββββββββ βββββββββββββββββββββ βββββββββββββββββββββ
β β β β β β
β Query ββββββΆβ Multi-Modal βββββββ Knowledge Graph β
β Processing β β Retrieval Engine β β Construction β
β β β β β β
βββββββββββββββββββ βββββββββββ¬ββββββββββ βββββββββ¬ββββββββββββ
β β
βΌ βΌ
βββββββββββββββββββββ βββββββββββββββββββββ
β β β β
β LLM Response β β Vector Database β
β Generation β β (Embeddings) β
β β β β
βββββββββββββββββββββ βββββββββββββββββββββ
Let's implement this architecture step by step with Python code examples.
1. Text Processing and Chunking π
import re
import nltk
from nltk.tokenize import sent_tokenize
import spacy
from langchain_text_splitters import RecursiveCharacterTextSplitter
# Download NLTK resources
nltk.download('punkt')
# Load SpaCy model
nlp = spacy.load("en_core_web_sm")
def preprocess_text(text):
"""Clean and normalize text"""
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
# Normalize quotes, dashes, etc.
text = text.replace('"', '"').replace('"', '"').replace('β', '-')
return text
def chunk_document(document, chunk_size=1000, chunk_overlap=200):
"""Split document into overlapping chunks"""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", ". ", " ", ""]
)
chunks = text_splitter.split_text(document)
return chunks
def extract_sentences(text):
"""Extract sentences from text"""
return sent_tokenize(text)
# Example usage
document = """
Knowledge graphs and vector embeddings are two powerful technologies in modern AI systems.
Knowledge graphs represent structured relationships between entities. Vector embeddings
capture semantic meaning in continuous vector spaces. Combining them creates robust
information retrieval systems that leverage both explicit relationships and semantic similarity.
"""
# Preprocess
clean_text = preprocess_text(document)
# Create chunks
chunks = chunk_document(clean_text)
print(f"Created {len(chunks)} chunks")
# Extract sentences
sentences = extract_sentences(clean_text)
print(f"Extracted {len(sentences)} sentences")
2. Creating Vector Embeddings π’
import numpy as np
from sentence_transformers import SentenceTransformer
import pinecone
import os
from uuid import uuid4
# Initialize embedding model
model = SentenceTransformer('all-mpnet-base-v2') # Good balance of quality and speed
def generate_embeddings(texts):
"""Generate embeddings for a list of text chunks"""
embeddings = model.encode(texts)
return embeddings
def store_in_pinecone(chunks, embeddings, namespace="document-embeddings"):
"""Store embeddings in Pinecone vector database"""
# Initialize Pinecone
pinecone.init(
api_key=os.environ.get("PINECONE_API_KEY"),
environment=os.environ.get("PINECONE_ENVIRONMENT")
)
# Create or get index
index_name = "knowledge-base"
if index_name not in pinecone.list_indexes():
pinecone.create_index(
name=index_name,
dimension=embeddings.shape[1],
metric="cosine"
)
index = pinecone.Index(index_name)
# Prepare data for upsert
vectors = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
chunk_id = str(uuid4())
vectors.append({
"id": chunk_id,
"values": embedding.tolist(),
"metadata": {
"text": chunk,
"chunk_index": i
}
})
# Upsert in batches of 100
batch_size = 100
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i+batch_size]
index.upsert(vectors=batch, namespace=namespace)
return index_name
# Example usage
# Generate embeddings for chunks
chunk_embeddings = generate_embeddings(chunks)
print(f"Generated embeddings with shape: {chunk_embeddings.shape}")
# Store in vector database
# index_name = store_in_pinecone(chunks, chunk_embeddings)
3. Entity and Relationship Extraction for Knowledge Graph π
import spacy
from spacy.matcher import DependencyMatcher
from itertools import combinations
# Load SpaCy with NER capabilities
nlp = spacy.load("en_core_web_lg")
def extract_entities(text):
"""Extract named entities from text"""
doc = nlp(text)
entities = []
for ent in doc.ents:
entities.append({
"text": ent.text,
"label": ent.label_,
"start": ent.start_char,
"end": ent.end_char
})
# Also capture noun chunks as potential entities
for chunk in doc.noun_chunks:
if chunk.text.lower() not in [e["text"].lower() for e in entities]:
entities.append({
"text": chunk.text,
"label": "CONCEPT",
"start": chunk.start_char,
"end": chunk.end_char
})
return entities
def extract_relationships(text, entities):
"""Extract relationships between entities"""
doc = nlp(text)
relationships = []
# Map entity texts to their info
entity_map = {e["text"].lower(): e for e in entities}
# Create entity spans based on text positions
entity_spans = [(e["text"], doc.char_span(e["start"], e["end"])) for e in entities]
entity_spans = [(text, span) for text, span in entity_spans if span is not None]
# Find relationships based on syntactic dependencies
for entity1, span1 in entity_spans:
for entity2, span2 in entity_spans:
if entity1 != entity2:
# Find the shortest dependency path between entities
if span1 and span2:
path = []
for token in doc:
if span1.start <= token.i <= span1.end or span2.start <= token.i <= span2.end:
path.append(token)
# If entities are close in the dependency tree
if len(path) < 10: # Arbitrary threshold
# Find verbs or prepositions that might indicate relationships
relation_tokens = [token for token in doc if token.pos_ in ["VERB", "ADP"]]
for token in relation_tokens:
if min(abs(token.i - span1.end), abs(token.i - span2.start)) < 5:
relation = token.text
relationships.append({
"source": entity1,
"source_type": entity_map.get(entity1.lower(), {}).get("label", "CONCEPT"),
"relation": relation,
"target": entity2,
"target_type": entity_map.get(entity2.lower(), {}).get("label", "CONCEPT")
})
# Co-occurrence relationships (entities in the same sentence)
for sent in doc.sents:
sent_entities = [
(text, span) for text, span in entity_spans
if span.start >= sent.start and span.end <= sent.end
]
for (entity1, span1), (entity2, span2) in combinations(sent_entities, 2):
if not any(r["source"] == entity1 and r["target"] == entity2 for r in relationships):
relationships.append({
"source": entity1,
"source_type": entity_map.get(entity1.lower(), {}).get("label", "CONCEPT"),
"relation": "RELATED_TO",
"target": entity2,
"target_type": entity_map.get(entity2.lower(), {}).get("label", "CONCEPT")
})
return relationships
# Example
sample_text = """
The OpenAI GPT-4 model was developed by OpenAI researchers including Sam Altman and
Ilya Sutskever. It builds upon previous iterations like GPT-3 and uses transformer
architecture to generate human-like text. The model was trained on diverse text from the internet.
"""
# Extract entities and relationships
entities = extract_entities(sample_text)
print(f"Extracted {len(entities)} entities")
relationships = extract_relationships(sample_text, entities)
print(f"Extracted {len(relationships)} relationships")
4. Building and Storing the Knowledge Graph ποΈ
from neo4j import GraphDatabase
class KnowledgeGraph:
def __init__(self, uri, username, password):
self.driver = GraphDatabase.driver(uri, auth=(username, password))
def close(self):
self.driver.close()
def create_constraints(self):
"""Create constraints for entity uniqueness"""
with self.driver.session() as session:
session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE")
session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (c:Chunk) REQUIRE c.id IS UNIQUE")
def add_entity(self, name, entity_type, properties=None):
"""Add an entity to the knowledge graph"""
with self.driver.session() as session:
properties_str = ""
if properties:
properties_str = ", ".join(f"{k}: ${k}" for k in properties.keys())
if properties_str:
properties_str = ", " + properties_str
query = f"""
MERGE (e:Entity {{name: $name}})
SET e:{entity_type}, e.type = $entity_type{properties_str}
RETURN e
"""
params = {"name": name, "entity_type": entity_type}
if properties:
params.update(properties)
result = session.run(query, params)
return result.single()
def add_relationship(self, source_name, relation, target_name, properties=None):
"""Add a relationship between entities"""
with self.driver.session() as session:
properties_str = ""
if properties:
properties_str = ", ".join(f"r.{k} = ${k}" for k in properties.keys())
if properties_str:
properties_str = "SET " + properties_str
query = f"""
MATCH (source:Entity {{name: $source_name}})
MATCH (target:Entity {{name: $target_name}})
MERGE (source)-[r:{relation}]->(target)
{properties_str}
RETURN r
"""
params = {"source_name": source_name, "target_name": target_name}
if properties:
params.update(properties)
result = session.run(query, params)
return result.single()
def add_chunk(self, chunk_id, text, embedding_id=None):
"""Add a text chunk to the knowledge graph"""
with self.driver.session() as session:
query = """
MERGE (c:Chunk {id: $chunk_id})
SET c.text = $text, c.embedding_id = $embedding_id
RETURN c
"""
result = session.run(query, {
"chunk_id": chunk_id,
"text": text,
"embedding_id": embedding_id
})
return result.single()
def link_entity_to_chunk(self, entity_name, chunk_id):
"""Link an entity to a chunk where it appears"""
with self.driver.session() as session:
query = """
MATCH (e:Entity {name: $entity_name})
MATCH (c:Chunk {id: $chunk_id})
MERGE (e)-[r:APPEARS_IN]->(c)
RETURN r
"""
result = session.run(query, {
"entity_name": entity_name,
"chunk_id": chunk_id
})
return result.single()
# Example usage
def build_knowledge_graph(chunks, entities_list, relationships_list):
# Initialize knowledge graph
kg = KnowledgeGraph("bolt://localhost:7687", "neo4j", "password")
kg.create_constraints()
# Add chunks
chunk_ids = []
for i, chunk in enumerate(chunks):
chunk_id = f"chunk_{i}"
kg.add_chunk(chunk_id, chunk)
chunk_ids.append(chunk_id)
# Add entities and link to chunks
for i, entities in enumerate(entities_list):
chunk_id = chunk_ids[i]
for entity in entities:
kg.add_entity(entity["text"], entity["label"])
kg.link_entity_to_chunk(entity["text"], chunk_id)
# Add relationships
for i, relationships in enumerate(relationships_list):
for rel in relationships:
kg.add_relationship(rel["source"], rel["relation"], rel["target"])
return kg
# Process all chunks
entities_list = []
relationships_list = []
for chunk in chunks:
chunk_entities = extract_entities(chunk)
chunk_relationships = extract_relationships(chunk, chunk_entities)
entities_list.append(chunk_entities)
relationships_list.append(chunk_relationships)
# Uncomment to build the actual graph
# knowledge_graph = build_knowledge_graph(chunks, entities_list, relationships_list)
5. Linking Vector Embeddings to Knowledge Graphs π
The key innovation here is connecting your vector database with your knowledge graph:
def link_embeddings_to_knowledge_graph(kg, index_name, namespace="document-embeddings"):
"""Link vector embeddings to the knowledge graph"""
# Connect to vector database
pinecone.init(
api_key=os.environ.get("PINECONE_API_KEY"),
environment=os.environ.get("PINECONE_ENVIRONMENT")
)
index = pinecone.Index(index_name)
# Get all vectors with their IDs
fetch_response = index.fetch(ids=[], namespace=namespace)
vectors = fetch_response.vectors
# For each vector, add its ID to the corresponding chunk in the knowledge graph
with kg.driver.session() as session:
for vector_id, vector_data in vectors.items():
chunk_index = vector_data.metadata.get("chunk_index")
chunk_id = f"chunk_{chunk_index}"
# Update chunk with vector ID
query = """
MATCH (c:Chunk {id: $chunk_id})
SET c.vector_id = $vector_id
RETURN c
"""
session.run(query, {
"chunk_id": chunk_id,
"vector_id": vector_id
})
return True
# This function can be called after building both the vector database and knowledge graph
# link_embeddings_to_knowledge_graph(knowledge_graph, index_name)
6. Hybrid Retrieval System: Combining KG and Vector Search π
Now let's implement the hybrid retrieval system that uses both vector similarity and knowledge graph relationships:
import numpy as np
class HybridRetriever:
def __init__(self, kg_driver, vector_index, embedding_model, namespace="document-embeddings"):
self.kg_driver = kg_driver
self.vector_index = vector_index
self.embedding_model = embedding_model
self.namespace = namespace
def retrieve_by_vector_similarity(self, query, top_k=5):
"""Retrieve chunks by vector similarity"""
# Generate query embedding
query_embedding = self.embedding_model.encode(query)
# Query vector database
results = self.vector_index.query(
vector=query_embedding.tolist(),
top_k=top_k,
include_metadata=True,
namespace=self.namespace
)
# Extract results
chunks = []
for match in results.matches:
chunks.append({
"text": match.metadata.get("text", ""),
"score": match.score,
"vector_id": match.id
})
return chunks
def retrieve_by_entity(self, entity_name, top_k=5):
"""Retrieve chunks containing a specific entity"""
with self.kg_driver.session() as session:
query = """
MATCH (e:Entity {name: $entity_name})-[:APPEARS_IN]->(c:Chunk)
RETURN c.id as chunk_id, c.text as text
LIMIT $top_k
"""
result = session.run(query, {
"entity_name": entity_name,
"top_k": top_k
})
chunks = []
for record in result:
chunks.append({
"text": record["text"],
"chunk_id": record["chunk_id"],
"score": 1.0 # Default score for KG results
})
return chunks
def retrieve_related_entities(self, entity_name, relationship_type=None, top_k=5):
"""Retrieve entities related to a specific entity"""
with self.kg_driver.session() as session:
rel_clause = ""
if relationship_type:
rel_clause = f":{relationship_type}"
query = f"""
MATCH (e:Entity {{name: $entity_name}})-[r{rel_clause}]->(related:Entity)
RETURN related.name as entity_name, related.type as entity_type, type(r) as relationship
LIMIT $top_k
"""
result = session.run(query, {
"entity_name": entity_name,
"top_k": top_k
})
related = []
for record in result:
related.append({
"name": record["entity_name"],
"type": record["entity_type"],
"relationship": record["relationship"]
})
return related
def hybrid_retrieve(self, query, entity_weight=0.3, top_k=5):
"""Perform hybrid retrieval using both vector similarity and knowledge graph"""
# Extract potential entities from the query
query_entities = extract_entities(query)
# Get vector similarity results
vector_results = self.retrieve_by_vector_similarity(query, top_k=top_k)
# Get knowledge graph results for each entity
kg_results = []
for entity in query_entities:
entity_results = self.retrieve_by_entity(entity["text"], top_k=3)
kg_results.extend(entity_results)
# Combine and rank results
combined_results = {}
# Add vector results
for result in vector_results:
combined_results[result["text"]] = {
"text": result["text"],
"vector_score": result["score"],
"kg_score": 0.0,
"sources": ["vector"]
}
# Add and merge KG results
for result in kg_results:
if result["text"] in combined_results:
combined_results[result["text"]]["kg_score"] = result["score"]
combined_results[result["text"]]["sources"].append("kg")
else:
combined_results[result["text"]] = {
"text": result["text"],
"vector_score": 0.0,
"kg_score": result["score"],
"sources": ["kg"]
}
# Calculate final scores
for text, result in combined_results.items():
result["final_score"] = (1 - entity_weight) * result["vector_score"] + entity_weight * result["kg_score"]
# Sort by final score
ranked_results = sorted(
combined_results.values(),
key=lambda x: x["final_score"],
reverse=True
)
return ranked_results[:top_k]
def retrieve_with_context(self, query, top_k=5):
"""Retrieve chunks with additional context from the knowledge graph"""
# Get initial results
results = self.hybrid_retrieve(query, top_k=top_k)
# Extract entities from results
result_entities = set()
for result in results:
entities = extract_entities(result["text"])
for entity in entities:
result_entities.add(entity["text"])
# Get related entities and their connections
context = {}
for entity in result_entities:
related = self.retrieve_related_entities(entity, top_k=3)
context[entity] = related
# Add context to results
for result in results:
result["context"] = context
return results
# Example usage
# Create hybrid retriever
# retriever = HybridRetriever(
# kg_driver=knowledge_graph.driver,
# vector_index=pinecone.Index(index_name),
# embedding_model=model,
# namespace="document-embeddings"
# )
# query = "What is the relationship between transformers and GPT models?"
# results = retriever.retrieve_with_context(query)
7. Combining with an LLM for RAG π€
Let's integrate the hybrid retriever with an LLM to create a complete RAG system:
from openai import OpenAI
import os
class RAGSystem:
def __init__(self, retriever, openai_api_key=None):
self.retriever = retriever
self.openai_client = OpenAI(api_key=openai_api_key or os.environ.get("OPENAI_API_KEY"))
def generate_prompt(self, query, retrieved_chunks):
"""Generate a prompt for the LLM based on retrieved chunks"""
context = "\n\n".join([chunk["text"] for chunk in retrieved_chunks])
prompt = f"""
You are an AI assistant that provides factual information based on the provided context.
CONTEXT:
{context}
USER QUERY:
{query}
Please answer the user's query based on the context provided. If the information isn't available in the context, say so.
ANSWER:
"""
return prompt
def answer_query(self, query, top_k=5):
"""Answer a query using RAG (Retrieval-Augmented Generation)"""
# Retrieve relevant chunks
retrieved_chunks = self.retriever.retrieve_with_context(query, top_k=top_k)
# Generate prompt
prompt = self.generate_prompt(query, retrieved_chunks)
# Get LLM response
response = self.openai_client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant that answers questions based on provided context."},
{"role": "user", "content": prompt}
],
temperature=0.4,
max_tokens=1000
)
return {
"answer": response.choices[0].message.content,
"sources": retrieved_chunks
}
def answer_with_graph_navigation(self, query, entity=None):
"""Answer queries that require graph navigation"""
# Extract entities from query if none provided
if not entity:
query_entities = extract_entities(query)
if query_entities:
entity = query_entities[0]["text"]
# Get related entities from knowledge graph
if entity:
related_entities = self.retriever.retrieve_related_entities(entity, top_k=10)
# Get chunks containing related entities
chunks = []
for related in related_entities:
entity_chunks = self.retriever.retrieve_by_entity(related["name"], top_k=2)
chunks.extend(entity_chunks)
# Combine with vector retrieval
vector_chunks = self.retriever.retrieve_by_vector_similarity(query, top_k=3)
all_chunks = chunks + vector_chunks
# Remove duplicates
unique_chunks = {}
for chunk in all_chunks:
if chunk["text"] not in unique_chunks:
unique_chunks[chunk["text"]] = chunk
# Generate answer
prompt = self.generate_prompt(query, list(unique_chunks.values()))
response = self.openai_client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant that answers questions based on provided context."},
{"role": "user", "content": prompt}
],
temperature=0.4,
max_tokens=1000
)
return {
"answer": response.choices[0].message.content,
"sources": list(unique_chunks.values()),
"graph_path": related_entities
}
else:
# Fallback to regular RAG if no entity found
return self.answer_query(query, top_k=5)
# Example usage
# rag_system = RAGSystem(retriever)
# response = rag_system.answer_query("How does the GPT-4 model work?")
# print(response["answer"])
8. Practical Example: Building a KG-Enhanced RAG System π
import requests
import os
from bs4 import BeautifulSoup
from uuid import uuid4
import json
def fetch_documentation(url):
"""Fetch documentation from a URL"""
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
# Extract text from paragraphs
paragraphs = soup.find_all('p')
text = '\n\n'.join([p.get_text() for p in paragraphs])
return text
def process_document(text):
"""Process a document for RAG"""
# Preprocess
clean_text = preprocess_text(text)
# Create chunks
chunks = chunk_document(clean_text)
# Generate embeddings
chunk_embeddings = generate_embeddings(chunks)
# Process entities and relationships
entities_list = []
relationships_list = []
for chunk in chunks:
chunk_entities = extract_entities(chunk)
chunk_relationships = extract_relationships(chunk, chunk_entities)
entities_list.append(chunk_entities)
relationships_list.append(chunk_relationships)
return {
"chunks": chunks,
"embeddings": chunk_embeddings,
"entities_list": entities_list,
"relationships_list": relationships_list
}
def save_processed_data(data, filename):
"""Save processed data to disk"""
# Convert numpy arrays to lists
serializable_data = {
"chunks": data["chunks"],
"embeddings": data["embeddings"].tolist(),
"entities_list": data["entities_list"],
"relationships_list": data["relationships_list"]
}
with open(filename, 'w') as f:
json.dump(serializable_data, f)
def load_processed_data(filename):
"""Load processed data from disk"""
with open(filename, 'r') as f:
data = json.load(f)
# Convert lists back to numpy arrays
data["embeddings"] = np.array(data["embeddings"])
return data
# Main function to build the system
def build_rag_system():
# 1. Fetch and process documents
urls = [
"https://docs.example.com/page1",
"https://docs.example.com/page2"
]
all_data = {
"chunks": [],
"embeddings": [],
"entities_list": [],
"relationships_list": []
}
for url in urls:
# Fetch and process
text = fetch_documentation(url)
processed = process_document(text)
# Append to all data
all_data["chunks"].extend(processed["chunks"])
if len(all_data["embeddings"]) == 0:
all_data["embeddings"] = processed["embeddings"]
else:
all_data["embeddings"] = np.vstack([all_data["embeddings"], processed["embeddings"]])
all_data["entities_list"].extend(processed["entities_list"])
all_data["relationships_list"].extend(processed["relationships_list"])
# 2. Store embeddings in vector database
pinecone.init(
api_key=os.environ.get("PINECONE_API_KEY"),
environment=os.environ.get("PINECONE_ENVIRONMENT")
)
index_name = "documentation-index"
if index_name not in pinecone.list_indexes():
pinecone.create_index(
name=index_name,
dimension=all_data["embeddings"].shape[1],
metric="cosine"
)
index = pinecone.Index(index_name)
# Prepare data for upsert
vectors = []
for i, (chunk, embedding) in enumerate(zip(all_data["chunks"], all_data["embeddings"])):
chunk_id = str(uuid4())
vectors.append({
"id": chunk_id,
"values": embedding.tolist(),
"metadata": {
"text": chunk,
"chunk_index": i
}
})
# Upsert in batches
batch_size = 100
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i+batch_size]
index.upsert(vectors=batch, namespace="documentation")
# 3. Build knowledge graph
kg = KnowledgeGraph("bolt://localhost:7687", "neo4j", "password")
kg.create_constraints()
# Add chunks
chunk_ids = []
for i, chunk in enumerate(all_data["chunks"]):
chunk_id = f"chunk_{i}"
kg.add_chunk(chunk_id, chunk)
chunk_ids.append(chunk_id)
# Add entities and link to chunks
for i, entities in enumerate(all_data["entities_list"]):
chunk_id = chunk_ids[i]
for entity in entities:
kg.add_entity(entity["text"], entity["label"])
kg.link_entity_to_chunk(entity["text"], chunk_id)
# Add relationships
for i, relationships in enumerate(all_data["relationships_list"]):
for rel in relationships:
kg.add_relationship(rel["source"], rel["relation"], rel["target"])
# 4. Link vector IDs to knowledge graph chunks
for i, vector in enumerate(vectors):
chunk_id = f"chunk_{vector['metadata']['chunk_index']}"
with kg.driver.session() as session:
query = """
MATCH (c:Chunk {id: $chunk_id})
SET c.vector_id = $vector_id
RETURN c
"""
session.run(query, {
"chunk_id": chunk_id,
"vector_id": vector["id"]
})
# 5. Create hybrid retriever
model = SentenceTransformer('all-mpnet-base-v2')
retriever = HybridRetriever(
kg_driver=kg.driver,
vector_index=index,
embedding_model=model,
namespace="documentation"
)
# 6. Create RAG system
rag_system = RAGSystem(retriever)
return rag_system
# Example usage
# system = build_rag_system()
# response = system.answer_query("How do knowledge graphs work with vector embeddings?")
# print(response["answer"])
9. Implementing Advanced Knowledge Graph Querying π
One of the most significant advantages of combining vector embeddings with knowledge graphs is the ability to perform complex, multi-hop queries. Let's implement some advanced querying capabilities:
class AdvancedKGRetriever:
def __init__(self, kg_driver):
self.kg_driver = kg_driver
def entity_based_search(self, entity_type, property_filters=None, limit=10):
"""Search for entities of a specific type with property filters"""
with self.kg_driver.session() as session:
# Build WHERE clause for property filters
where_clause = ""
params = {"limit": limit}
if property_filters:
conditions = []
for i, (prop, value) in enumerate(property_filters.items()):
param_name = f"prop_{i}"
conditions.append(f"e.{prop} = ${param_name}")
params[param_name] = value
if conditions:
where_clause = "WHERE " + " AND ".join(conditions)
# Build query
query = f"""
MATCH (e:{entity_type})
{where_clause}
RETURN e.name as name, e.type as type, properties(e) as properties
LIMIT $limit
"""
result = session.run(query, params)
entities = []
for record in result:
entities.append({
"name": record["name"],
"type": record["type"],
"properties": record["properties"]
})
return entities
def path_based_search(self, start_entity, end_entity, max_hops=3):
"""Find paths between two entities in the knowledge graph"""
with self.kg_driver.session() as session:
query = """
MATCH path = shortestPath((start:Entity {name: $start_name})-[*1..${max_hops}]-(end:Entity {name: $end_name}))
RETURN [node in nodes(path) | node.name] as entities,
[rel in relationships(path) | type(rel)] as relationships
"""
result = session.run(query, {
"start_name": start_entity,
"end_name": end_entity,
"max_hops": max_hops
})
paths = []
for record in result:
path = []
entities = record["entities"]
relationships = record["relationships"]
for i in range(len(relationships)):
path.append({
"source": entities[i],
"relation": relationships[i],
"target": entities[i+1]
})
paths.append(path)
return paths
def subgraph_extraction(self, entity_name, depth=2):
"""Extract a subgraph around an entity"""
with self.kg_driver.session() as session:
query = """
MATCH path = (center:Entity {name: $entity_name})-[*1..$depth]-(connected)
RETURN nodes(path) as nodes, relationships(path) as relationships
"""
result = session.run(query, {
"entity_name": entity_name,
"depth": depth
})
nodes = set()
relationships = []
for record in result:
for node in record["nodes"]:
nodes.add((node["name"], node.labels))
for rel in record["relationships"]:
source = rel.start_node["name"]
target = rel.end_node["name"]
rel_type = type(rel)
relationships.append({
"source": source,
"relation": rel_type,
"target": target
})
return {
"nodes": list(nodes),
"relationships": relationships
}
def semantic_graph_search(self, text_query, embedding_model, top_k=5):
"""Search for entities semantically similar to a text query"""
# Generate query embedding
query_embedding = embedding_model.encode(text_query)
# Get all entities with their descriptions
with self.kg_driver.session() as session:
query = """
MATCH (e:Entity)
RETURN e.name as name, e.type as type,
CASE WHEN e.description IS NOT NULL THEN e.description ELSE e.name END as text
"""
result = session.run(query)
entities = []
for record in result:
entities.append({
"name": record["name"],
"type": record["type"],
"text": record["text"]
})
# Generate embeddings for all entities
entity_texts = [entity["text"] for entity in entities]
entity_embeddings = embedding_model.encode(entity_texts)
# Calculate similarity scores
similarities = []
for i, entity in enumerate(entities):
entity_embedding = entity_embeddings[i]
similarity = np.dot(query_embedding, entity_embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(entity_embedding)
)
similarities.append({
"entity": entity,
"score": similarity
})
# Sort by similarity score
similarities.sort(key=lambda x: x["score"], reverse=True)
return similarities[:top_k]
10. Implementing Knowledge Graph Reasoning π§
One of the most powerful aspects of knowledge graphs is the ability to perform reasoning and inference. Let's implement some reasoning capabilities:
class KGReasoner:
def __init__(self, kg_driver):
self.kg_driver = kg_driver
def find_analogies(self, source_entity1, source_entity2, target_entity):
"""Find analogies (A is to B as C is to ?)"""
with self.kg_driver.session() as session:
# First, find relationships between source entities
query1 = """
MATCH path = shortestPath((e1:Entity {name: $source1})-[r*1..3]-(e2:Entity {name: $source2}))
RETURN [node in nodes(path) | node.name] as entities,
[rel in relationships(path) | type(rel)] as relationships
"""
result1 = session.run(query1, {
"source1": source_entity1,
"source2": source_entity2
})
source_path = None
for record in result1:
source_path = {
"entities": record["entities"],
"relationships": record["relationships"]
}
break
if not source_path:
return []
# Now, find entities that have the same relationship pattern with target entity
analogous_entities = []
# This is a simplified approach - a more robust implementation would match
# the entire path pattern, not just direct relationships
query2 = """
MATCH (e1:Entity {name: $target})-[r1:%s]-(e2:Entity)
RETURN e2.name as entity, e2.type as type
""" % source_path["relationships"][0]
result2 = session.run(query2, {
"target": target_entity
})
for record in result2:
analogous_entities.append({
"name": record["entity"],
"type": record["type"],
"analogy": f"{source_entity1} is to {source_entity2} as {target_entity} is to {record['entity']}"
})
return analogous_entities
def infer_relationships(self):
"""Infer new relationships based on patterns in the knowledge graph"""
with self.kg_driver.session() as session:
# Infer transitive relationships
# If A PART_OF B and B PART_OF C, then A PART_OF C
query1 = """
MATCH (a:Entity)-[:PART_OF]->(b:Entity)-[:PART_OF]->(c:Entity)
WHERE NOT (a)-[:PART_OF]->(c)
RETURN a.name as source, c.name as target
"""
result1 = session.run(query1)
inferred = []
for record in result1:
# Create the inferred relationship
create_query = """
MATCH (a:Entity {name: $source}), (c:Entity {name: $target})
MERGE (a)-[r:PART_OF {inferred: true}]->(c)
RETURN r
"""
session.run(create_query, {
"source": record["source"],
"target": record["target"]
})
inferred.append({
"source": record["source"],
"relation": "PART_OF",
"target": record["target"],
"rule": "Transitive PART_OF"
})
# Infer IS_A relationships
# If A HAS_PROPERTY B and C HAS_PROPERTY B and C IS_A D, then A might be IS_A D
query2 = """
MATCH (a:Entity)-[:HAS_PROPERTY]->(b:Entity),
(c:Entity)-[:HAS_PROPERTY]->(b:Entity),
(c)-[:IS_A]->(d:Entity)
WHERE NOT (a)-[:IS_A]->(d) AND a <> c
RETURN a.name as source, d.name as target, count(b) as property_count
"""
result2 = session.run(query2)
for record in result2:
if record["property_count"] >= 3: # Threshold for inference
# Create the inferred relationship
create_query = """
MATCH (a:Entity {name: $source}), (d:Entity {name: $target})
MERGE (a)-[r:IS_A {inferred: true, confidence: $confidence}]->(d)
RETURN r
"""
confidence = min(record["property_count"] / 10, 0.9)
session.run(create_query, {
"source": record["source"],
"target": record["target"],
"confidence": confidence
})
inferred.append({
"source": record["source"],
"relation": "IS_A",
"target": record["target"],
"confidence": confidence,
"rule": "Common properties inference"
})
return inferred
11. Advanced RAG with Multi-Hop Knowledge Paths π£οΈ
Now, let's build the most advanced RAG retrieval system that uses multi-hop paths in the knowledge graph:
class MultiHopRetriever:
def __init__(self, kg_driver, vector_index, embedding_model, namespace="documentation"):
self.kg_driver = kg_driver
self.vector_index = vector_index
self.embedding_model = embedding_model
self.namespace = namespace
def extract_query_entities(self, query):
"""Extract entities from the query"""
entities = extract_entities(query)
return entities
def get_multi_hop_context(self, entity_name, max_hops=2, max_paths=3):
"""Get multi-hop context from knowledge graph"""
with self.kg_driver.session() as session:
# Find multi-hop paths starting from the entity
query = """
MATCH path = (start:Entity {name: $entity_name})-[*1..$max_hops]-(end:Entity)
WHERE end <> start
RETURN path, length(path) as path_length
ORDER BY path_length
LIMIT $max_paths
"""
result = session.run(query, {
"entity_name": entity_name,
"max_hops": max_hops,
"max_paths": max_paths
})
paths = []
for record in result:
path = record["path"]
nodes = [node["name"] for node in path.nodes]
relationships = [rel.type for rel in path.relationships]
path_data = {
"nodes": nodes,
"relationships": relationships,
"length": record["path_length"]
}
paths.append(path_data)
return paths
def get_chunks_from_paths(self, paths):
"""Get text chunks associated with entities in paths"""
entities = set()
for path in paths:
for node in path["nodes"]:
entities.add(node)
chunks = []
with self.kg_driver.session() as session:
for entity in entities:
query = """
MATCH (e:Entity {name: $entity_name})-[:APPEARS_IN]->(c:Chunk)
RETURN c.text as text, c.id as id
LIMIT 2
"""
result = session.run(query, {
"entity_name": entity
})
for record in result:
chunks.append({
"text": record["text"],
"source": f"Entity: {entity}",
"chunk_id": record["id"]
})
return chunks
def retrieve(self, query, top_k=5):
"""Retrieve chunks using multi-hop knowledge paths and vector similarity"""
# Extract entities from query
query_entities = self.extract_query_entities(query)
# Get vector similarity results
query_embedding = self.embedding_model.encode(query)
vector_results = self.vector_index.query(
vector=query_embedding.tolist(),
top_k=top_k,
include_metadata=True,
namespace=self.namespace
)
vector_chunks = []
for match in vector_results.matches:
vector_chunks.append({
"text": match.metadata.get("text", ""),
"score": match.score,
"source": "Vector similarity"
})
# Get knowledge graph results using multi-hop paths
kg_chunks = []
for entity in query_entities:
# Get multi-hop paths
paths = self.get_multi_hop_context(entity["text"], max_hops=2, max_paths=3)
# Get chunks from paths
path_chunks = self.get_chunks_from_paths(paths)
kg_chunks.extend(path_chunks)
# Combine results
all_chunks = vector_chunks + kg_chunks
# Remove duplicates
unique_chunks = {}
for chunk in all_chunks:
if chunk["text"] not in unique_chunks:
unique_chunks[chunk["text"]] = chunk
# Return combined results
return list(unique_chunks.values())
12. Building a Complete RAG System with KG Reasoning π§©
Finally, let's put everything together to create a comprehensive RAG system that uses both vector embeddings and knowledge graph reasoning:
class EnhancedRAGSystem:
def __init__(self, kg_driver, vector_index, embedding_model, openai_api_key=None):
self.kg_driver = kg_driver
self.vector_index = vector_index
self.embedding_model = embedding_model
self.openai_client = OpenAI(api_key=openai_api_key or os.environ.get("OPENAI_API_KEY"))
# Initialize components
self.multi_hop_retriever = MultiHopRetriever(
kg_driver=kg_driver,
vector_index=vector_index,
embedding_model=embedding_model
)
self.kg_reasoner = KGReasoner(kg_driver)
def answer_query(self, query, use_reasoning=True):
"""Answer a query using enhanced RAG with KG reasoning"""
# Retrieve relevant chunks
retrieved_chunks = self.multi_hop_retriever.retrieve(query, top_k=7)
# Extract entities and relationships for reasoning
query_entities = extract_entities(query)
reasoning_context = ""
if use_reasoning and query_entities:
# Infer new relationships
inferred_relationships = self.kg_reasoner.infer_relationships()
# Find analogies for entities in the query
analogies = []
for entity in query_entities:
# This is a simplified example - you'd need to select appropriate entity pairs
if len(query_entities) > 1:
for other_entity in query_entities:
if entity["text"] != other_entity["text"]:
analogy_results = self.kg_reasoner.find_analogies(
entity["text"], other_entity["text"], entity["text"]
)
analogies.extend(analogy_results)
# Add reasoning context
if inferred_relationships:
reasoning_context += "Inferred relationships:\n"
for rel in inferred_relationships[:5]: # Limit to top 5
reasoning_context += f"- {rel['source']} {rel['relation']} {rel['target']} (confidence: {rel.get('confidence', 'high')})\n"
if analogies:
reasoning_context += "\nAnalogies:\n"
for analogy in analogies[:3]: # Limit to top 3
reasoning_context += f"- {analogy['analogy']}\n"
# Prepare context for LLM
chunk_context = "\n\n".join([chunk["text"] for chunk in retrieved_chunks])
# Generate prompt with both retrieval and reasoning
prompt = f"""
You are an AI assistant that provides factual information based on the provided context.
RETRIEVED CHUNKS:
{chunk_context}
{reasoning_context if reasoning_context else ""}
USER QUERY:
{query}
Please answer the user's query based on the context provided. When multiple perspectives or sources are available,
consider them and provide a balanced answer. If you use information from the knowledge graph reasoning, make it clear.
If the information isn't available in the context, acknowledge the limitations of your answer.
ANSWER:
"""
# Get LLM response
response = self.openai_client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant that answers questions based on provided context."},
{"role": "user", "content": prompt}
],
temperature=0.4,
max_tokens=1000
)
return {
"answer": response.choices[0].message.content,
"sources": retrieved_chunks,
"reasoning": reasoning_context if reasoning_context else None
}
Practical Considerations for Building Your Own System π οΈ
When implementing a combined vector embedding and knowledge graph system, keep these practical considerations in mind:
1. Data Quality and Preprocessing
def clean_and_normalize_entities(entities_list):
"""Clean and normalize entities to improve matching"""
normalized_entities = []
for entities in entities_list:
cleaned = []
for entity in entities:
# Convert to lowercase for matching
entity["text"] = entity["text"].lower()
# Remove articles and common stopwords
entity["text"] = re.sub(r'^(the|a|an) ', '', entity["text"])
# Remove punctuation at the end
entity["text"] = re.sub(r'[.,;:!?]$', '', entity["text"])
# Add to cleaned list if not too short
if len(entity["text"]) > 2:
cleaned.append(entity)
normalized_entities.append(cleaned)
return normalized_entities
2. Entity Resolution and Deduplication
def resolve_entities(entity_list):
"""Resolve and deduplicate entities"""
# Group entities by similarity
entity_groups = {}
for entity in entity_list:
text = entity["text"]
found_match = False
for canonical, group in entity_groups.items():
# Check if similar using string similarity
similarity = difflib.SequenceMatcher(None, text, canonical).ratio()
if similarity > 0.85:
group.append(entity)
found_match = True
break
if not found_match:
entity_groups[text] = [entity]
# For each group, select canonical representation
resolved_entities = {}
for canonical, group in entity_groups.items():
# Choose the most frequent label
labels = [e["label"] for e in group]
most_common_label = max(set(labels), key=labels.count)
# Use the most common form of the entity
texts = [e["text"] for e in group]
most_common_text = max(set(texts), key=texts.count)
resolved_entities[most_common_text] = {
"text": most_common_text,
"label": most_common_label,
"alternatives": list(set(texts))
}
return list(resolved_entities.values())
3. Incremental Updates
def update_knowledge_system(new_document, kg, vector_index, embedding_model):
"""Process a new document and update both KG and vector DB"""
# Process the document
processed = process_document(new_document)
# Add to vector database
vectors = []
for i, (chunk, embedding) in enumerate(zip(processed["chunks"], processed["embeddings"])):
chunk_id = str(uuid4())
vectors.append({
"id": chunk_id,
"values": embedding.tolist(),
"metadata": {
"text": chunk,
"chunk_index": i
}
})
# Upsert to vector index
for vector in vectors:
vector_index.upsert(vectors=[vector], namespace="documentation")
# Add to knowledge graph
chunk_ids = []
for i, chunk in enumerate(processed["chunks"]):
chunk_id = f"chunk_{uuid4()}"
kg.add_chunk(chunk_id, chunk, vectors[i]["id"])
chunk_ids.append(chunk_id)
# Add entities and link to chunks
for i, entities in enumerate(processed["entities_list"]):
chunk_id = chunk_ids[i]
for entity in entities:
kg.add_entity(entity["text"], entity["label"])
kg.link_entity_to_chunk(entity["text"], chunk_id)
# Add relationships
for i, relationships in enumerate(processed["relationships_list"]):
for rel in relationships:
kg.add_relationship(rel["source"], rel["relation"], rel["target"])
return {
"added_chunks": len(processed["chunks"]),
"added_entities": sum(len(entities) for entities in processed["entities_list"]),
"added_relationships": sum(len(rels) for rels in processed["relationships_list"])
}
4. Monitoring and Evaluation
def evaluate_retrieval_quality(system, test_queries, ground_truth):
"""Evaluate the quality of the retrieval system"""
results = {
"precision": [],
"recall": [],
"mrr": [] # Mean Reciprocal Rank
}
for i, query in enumerate(test_queries):
# Get retrieved chunks
retrieved = system.multi_hop_retriever.retrieve(query, top_k=10)
retrieved_texts = [chunk["text"] for chunk in retrieved]
# Calculate precision and recall
relevant = ground_truth[i]
true_positives = len(set(retrieved_texts) & set(relevant))
precision = true_positives / len(retrieved_texts) if retrieved_texts else 0
recall = true_positives / len(relevant) if relevant else 0
results["precision"].append(precision)
results["recall"].append(recall)
# Calculate MRR
for j, text in enumerate(retrieved_texts):
if text in relevant:
results["mrr"].append(1 / (j + 1))
break
else:
results["mrr"].append(0)
# Calculate averages
avg_results = {
"avg_precision": sum(results["precision"]) / len(results["precision"]),
"avg_recall": sum(results["recall"]) / len(results["recall"]),
"avg_mrr": sum(results["mrr"]) / len(results["mrr"]),
"f1_score": 2 * (sum(results["precision"]) / len(results["precision"])) * (sum(results["recall"]) / len(results["recall"])) /
((sum(results["precision"]) / len(results["precision"])) + (sum(results["recall"]) / len(results["recall"])))
}
return avg_results
Why This Approach is Powerful: A Visual Example π
Here's a concrete example of how knowledge graph reasoning enhances vector-based retrieval:
Consider a query: "What are the environmental impacts of quantum computing?"
Vector Retrieval Alone:
Finds documents with terms like "environmental," "impact," and "quantum computing"
Might miss documents discussing "energy consumption" or "carbon footprint" of "data centers" if they don't explicitly mention "quantum computing"
Knowledge Graph Enhanced Retrieval:
Environment
β
| HAS_ASPECT
β
Carbon Footprint β-- MEASURED_BY -- Energy Consumption
β β
| AFFECTED_BY | CHARACTERISTIC_OF
β β
Computing Centers -- HOUSES β Quantum Computers
Here, the system can follow these paths to retrieve documents about energy consumption in data centers, even when they don't explicitly mention quantum computing, by understanding the relationships between these concepts.
Real-World Use Cases π
Enterprise Search & Knowledge Management
Connect internal documents, projects, people, and skills
Find not just exact matches but conceptually related content
Scientific Research
Connect papers, experiments, findings, and researchers
Discover implicit connections between different scientific domains
Customer Support Systems
Build a rich knowledge base of product issues, solutions, and related concepts.
Navigate complex troubleshooting steps with logical reasoning
E-commerce & Recommendation
Create product knowledge graphs with features, categories, and compatible items.
Enhance recommendations with relationship-based reasoning
Legal Document Analysis
Connect laws, precedents, legal concepts, and cases
Follow chains of reasoning through complex legal frameworks
Conclusion: Best Practices for Implementation π―
Start Simple: Begin with basic vector search, then add knowledge graph capabilities incrementally.
Focus on Quality: Invest in high-quality entity extraction and relationship identification.
Combine Methods: A hybrid approach weights vector similarity and graph relationships.
Iterative Improvement: Continuously refine your graph structure based on user feedback and query analysis.
User Feedback Loop: Implement mechanisms to learn from user interactions to improve retrieval quality.
Explainability: Make the system's reasoning transparent by showing paths and relationships used.
Balance Comprehensiveness and Precision: More entities and relationships aren't always better - focus on quality connections.
By combining the semantic understanding of vector embeddings with the structured reasoning of knowledge graphs, you can build information retrieval systems that are truly intelligent. These systems are capable not only of finding content but also of understanding and reasoning with it in ways that more closely match human cognition.
Subscribe to my newsletter
Read articles from Milind Zodge directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
