RAG Agnostic Agentic AI: The OmniScience Design Pattern

Table of contents

We all want our AI Agents to be database agnostic, query languages agnostic, and API-calling Agnostic. With the help of LLM, this is now possible. We can, as LLMs, build queries on the fly for our RAG to retrieve relevant data and then send it to LLM again and finally to the end user. In this article, I’ll elaborate on the OmniScience design pattern, its Classes, code, and architectures on the verge between AI Agents, Databases, Queries, and APIs.
The OmniScience Design Pattern
This design pattern these Classes
OmniConnector
: Handles database connections and query execution for RAG AgnosticSupports multiple database types
Async connection management
Error handling for queries
OmniQuery
: Handles query generation with LLM to any Query LanguageQuery templates for different databases
Intention translation logic
Database-specific query formatting
OmniChainer
Chain the output of the Queries from all OmniConnectionOmniScience
: Main orchestrator classProcesses user intentions by asking
OmniQuery
(build Queries with LLM) and executes onOmniConnector
Manages concurrent query execution
Aggregates results
The OmniConnector Class: Encapsulate Everything Databases/Resources (Graph/Neo4j, PostgreSQL, MemoryDb/Redis, Vector/OpenSearch & Tool[API]/graphQL, API)
from typing import Optional, Any, Dict, List
import asyncio
import aiohttp
from neo4j import GraphDatabase
import psycopg2
import redis
from opensearchpy import OpenSearch
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
from openai import OpenAI
import json
class OmniConnector:
def __init__(self, config: Dict[str, Dict[str, str]]):
"""
Initialize connections to different databases and services
:param config: Dictionary containing connection details for all services
"""
self.__neo4j_driver = None
self.__postgres_conn = None
self.__redis_client = None
self.__opensearch_client = None
self.__graphql_client = None
self.__llm_client = None
self.__initialize_connections(config)
def __initialize_connections(self, config: Dict[str, Dict[str, str]]) -> None:
"""
Initialize all database connections with provided configuration
"""
try:
# Initialize Neo4j connection
self.__neo4j_driver = GraphDatabase.driver(
config['neo4j']['uri'],
auth=(config['neo4j']['user'], config['neo4j']['password'])
)
# Initialize PostgreSQL connection
self.__postgres_conn = psycopg2.connect(
dbname=config['postgres']['dbname'],
user=config['postgres']['user'],
password=config['postgres']['password'],
host=config['postgres']['host']
)
# Initialize Redis connection
self.__redis_client = redis.Redis(
host=config['redis']['host'],
port=config['redis']['port'],
password=config['redis']['password']
)
# Initialize OpenSearch connection
self.__opensearch_client = OpenSearch(
hosts=[config['opensearch']['host']],
http_auth=(config['opensearch']['user'], config['opensearch']['password'])
)
# Initialize GraphQL client
transport = AIOHTTPTransport(url=config['graphql']['endpoint'])
self.__graphql_client = Client(transport=transport)
# Initialize OpenAI client
self.__llm_client = OpenAI(api_key=config['openai']['api_key'])
except Exception as e:
raise ConnectionError(f"Failed to initialize connections: {str(e)}")
async def generate_query(self, query_type: str, description: str) -> str:
"""
Generate database queries using LLM
:param query_type: Type of query to generate (SQL, Cypher, SPARQL, etc.)
:param description: Natural language description of the query
:return: Generated query string
"""
try:
prompt = f"Generate a {query_type} query for the following requirement: {description}"
response = await self.__llm_client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"You are an expert in {query_type} query generation."},
{"role": "user", "content": prompt}
]
)
return response.choices[0].message.content
except Exception as e:
raise Exception(f"Failed to generate {query_type} query: {str(e)}")
async def execute_neo4j_query(self, query: str, params: Optional[Dict] = None) -> List[Dict]:
"""Execute Neo4j query"""
try:
with self.__neo4j_driver.session() as session:
result = session.run(query, params or {})
return [record.data() for record in result]
except Exception as e:
raise Exception(f"Neo4j query execution failed: {str(e)}")
async def execute_postgres_query(self, query: str, params: Optional[tuple] = None) -> List[Dict]:
"""Execute PostgreSQL query"""
try:
with self.__postgres_conn.cursor() as cursor:
cursor.execute(query, params or ())
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
except Exception as e:
raise Exception(f"PostgreSQL query execution failed: {str(e)}")
async def execute_redis_command(self, command: str, *args) -> Any:
"""Execute Redis command"""
try:
return getattr(self.__redis_client, command)(*args)
except Exception as e:
raise Exception(f"Redis command execution failed: {str(e)}")
async def execute_opensearch_query(self, index: str, query: Dict) -> Dict:
"""Execute OpenSearch query"""
try:
return self.__opensearch_client.search(index=index, body=query)
except Exception as e:
raise Exception(f"OpenSearch query execution failed: {str(e)}")
async def execute_graphql_query(self, query: str, variables: Optional[Dict] = None) -> Dict:
"""Execute GraphQL query"""
try:
return await self.__graphql_client.execute_async(
gql(query),
variable_values=variables
)
except Exception as e:
raise Exception(f"GraphQL query execution failed: {str(e)}")
def __del__(self):
"""Cleanup connections"""
if self.__neo4j_driver:
self.__neo4j_driver.close()
if self.__postgres_conn:
self.__postgres_conn.close()
if self.__redis_client:
self.__redis_client.close()
# OpenSearch and GraphQL clients don't require explicit cleanup
# Example usage:
async def main():
config = {
'neo4j': {
'uri': 'bolt://localhost:7687',
'user': 'neo4j',
'password': 'password'
},
'postgres': {
'host': 'localhost',
'dbname': 'mydatabase',
'user': 'postgres',
'password': 'password'
},
'redis': {
'host': 'localhost',
'port': 6379,
'password': 'password'
},
'opensearch': {
'host': 'localhost:9200',
'user': 'admin',
'password': 'admin'
},
'graphql': {
'endpoint': 'http://localhost:8080/graphql'
},
'openai': {
'api_key': 'your-api-key'
}
}
omni = OmniQuery(config)
# Generate and execute a SQL query
sql_query = await omni.generate_query(
'SQL',
'Find all users who made purchases in the last 30 days'
)
results = await omni.execute_postgres_query(sql_query)
print(results)
# Generate and execute a Cypher query
cypher_query = await omni.generate_query(
'Cypher',
'Find all friends of friends relationships up to 3 levels deep'
)
results = await omni.execute_neo4j_query(cypher_query)
print(results)
if __name__ == "__main__":
asyncio.run(main())
OmniQuery: Ontology-Based LLM Queries Generator ( RDF, SPARQL , CYPHER, graphql, GREMLIN , JSON , SQL VECTOR )
from enum import Enum
from typing import Optional, Any, Dict, List
from dataclasses import dataclass
import json
import os
from abc import ABC, abstractmethod
class QueryType(Enum):
RDF = "rdf"
SPARQL = "sparql"
CYPHER = "cypher"
GRAPHQL = "graphql"
GREMLIN = "gremlin"
JSON = "json"
SQL = "sql"
VECTOR = "vector"
@dataclass
class QueryResult:
query: str
query_type: QueryType
metadata: Optional[Dict[str, Any]] = None
ontology_mapping: Optional[Dict[str, Any]] = None
class QueryGenerator(ABC):
@abstractmethod
def generate(self, query_string: str, ontology: Dict[str, Any]) -> str:
pass
class LLMQueryGenerator:
def __init__(self, model_name: str = "gpt-3.5-turbo"):
self.model_name = model_name
# Initialize your preferred LLM client here
# self.llm_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
def generate_prompt(self, query_type: QueryType, query_string: str, ontology: Dict[str, Any]) -> str:
"""Generate appropriate prompt based on query type and ontology"""
prompts = {
QueryType.RDF: """
Given the following natural language query and ontology, generate a valid RDF/OWL query.
Use the ontology structure to properly map relationships and concepts.
Query: {query}
Ontology: {ontology}
Generate RDF/OWL query that:
1. Uses proper RDF/OWL syntax
2. Maps to the given ontology concepts
3. Maintains semantic relationships
4. Includes relevant prefixes
""",
QueryType.SPARQL: """
Convert the following natural language query to SPARQL using the provided ontology.
Query: {query}
Ontology: {ontology}
Requirements:
1. Use proper SPARQL syntax
2. Include necessary PREFIX declarations
3. Map to ontology concepts
4. Optimize for performance
""",
QueryType.CYPHER: """
Transform this natural language query into an openCypher query using the ontology mapping.
Query: {query}
Ontology: {ontology}
The Cypher query should:
1. Use proper Cypher syntax
2. Include node labels from ontology
3. Map relationships correctly
4. Include relevant properties
""",
# Add similar prompts for other query types...
}
return prompts.get(query_type, "").format(
query=query_string,
ontology=json.dumps(ontology, indent=2)
)
class OmniQuery:
def __init__(self, query_string: str, ontology_path: Optional[str] = None):
self.query_string = query_string.strip()
self.ontology = self._load_ontology(ontology_path)
self.llm_generator = LLMQueryGenerator()
def _load_ontology(self, ontology_path: Optional[str]) -> Dict[str, Any]:
"""Load ontology from file or use default"""
if ontology_path and os.path.exists(ontology_path):
with open(ontology_path, 'r') as f:
return json.load(f)
return self._get_default_ontology()
def _get_default_ontology(self) -> Dict[str, Any]:
"""Return default ontology structure"""
return {
"classes": {
"Person": {
"properties": ["name", "age", "email"],
"relationships": ["knows", "worksFor"]
},
"Organization": {
"properties": ["name", "location"],
"relationships": ["employs", "partnersWith"]
}
},
"relationships": {
"knows": {
"domain": "Person",
"range": "Person"
},
"worksFor": {
"domain": "Person",
"range": "Organization"
}
}
}
def _generate_query_with_llm(self, query_type: QueryType) -> str:
"""Generate query using LLM"""
prompt = self.llm_generator.generate_prompt(
query_type,
self.query_string,
self.ontology
)
# Here you would call your LLM service
# response = self.llm_client.generate(prompt)
# return response.text
# Placeholder for demonstration
return f"Generated {query_type.value} query based on prompt: {prompt[:100]}..."
def to_rdf(self) -> QueryResult:
"""Convert to RDF/OWL format using LLM"""
query = self._generate_query_with_llm(QueryType.RDF)
return QueryResult(
query=query,
query_type=QueryType.RDF,
ontology_mapping=self._get_ontology_mapping(QueryType.RDF)
)
def to_sparql(self) -> QueryResult:
"""Convert to SPARQL query using LLM"""
query = self._generate_query_with_llm(QueryType.SPARQL)
return QueryResult(
query=query,
query_type=QueryType.SPARQL,
ontology_mapping=self._get_ontology_mapping(QueryType.SPARQL)
)
def _get_ontology_mapping(self, query_type: QueryType) -> Dict[str, Any]:
"""Get ontology mapping for specific query type"""
# Implementation would depend on your ontology structure
return {
"mapped_concepts": [],
"relationships": [],
"properties": []
}
def validate_query(self, query: str, query_type: QueryType) -> bool:
"""Validate generated query against ontology"""
# Implement validation logic
return True
def optimize_query(self, query: str, query_type: QueryType) -> str:
"""Optimize generated query"""
# Implement optimization logic
return query
def convert(self, target_type: Optional[QueryType] = None) -> QueryResult:
"""Convert query to specified type or auto-detect"""
if target_type is None:
target_type = self.auto_detect_type()
conversion_methods = {
QueryType.RDF: self.to_rdf,
QueryType.SPARQL: self.to_sparql,
# Add other conversion methods...
}
result = conversion_methods[target_type]()
# Validate and optimize
if self.validate_query(result.query, target_type):
result.query = self.optimize_query(result.query, target_type)
return result
OmniChain: Chain Everything
class OmniChain:
def __init__(self, dataFromQueries):
"""
Initialize the chaining process with an LLM and a list of templates.
:param llm: The language model to use.
:param templates: A list of PromptTemplate objects defining the chain.
"""
self.llm = llm
self.data = dataFromQueries
self.chains = self._create_chains()
def _create_chains(self):
"""Creates a sequence of LLMChains based on the provided templates."""
chains = []
for i, data in enumerate(self.data):
chains.append(LLMChain(llm=self.llm, prompt=data))
return chains
def run(self, docs):
"""
Runs the chain sequentially on the input data.
:param docs: List of text data to be processed sequentially.
:return: Final output after processing through all chains.
"""
previous_output = docs
for chain in self.chains:
previous_output = chain.run(previous_output)
return previous_output
OmniScience: Ask Me Anything
Now that we have OmniDatabase and OmniQuery encapsulated you can retrieve all the relevant pieces chain them together & send them to LLM
from langchain.chains import LLMChain, SequentialChain
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI
class OmniScience:
"""Main class that orchestrates the entire RAG system"""
def __init__(self, UserIntention):
self.omniConnector = OmniConnector()
self.OmniQuery = OmniQuery(UserIntention)
self.OmniChain = OmniChain(queryAnswer)
Summery
Using LLMs to build the Queries to our multi-database environment with pre-defined ontology as guidance allows our AI agent infinite capabilities when it comes to query/retrieval of relevant information based on user Intentionality. Dynamic Queries and retrieval based on LLM output of the User Intentions has limitless capabilities. The design pattern can be used to do just any kind of RAG/GraphRAG because you can retrieve anything from anywhere with any Query Language.
Subscribe to my newsletter
Read articles from Amit Sides directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Amit Sides
Amit Sides
Amit Sides is a Backend Developer, DevOps Expert, DevSecOps & MLOPS GITHUB https://github.com/amitsides Technology Stack o AWS-EKS/AKS/GKE / Cloud-Native / Multi-Cloud o Microservices + MSK + SQS + KMS o Linux System Administrator / Ansible o Dockerfiles o Kubernetes Clusters + Scalability (Karpanter/KEDA) o K8s Services Controllers Ingresses, Nginx, Load Balancers, Istio, CNI, Cillium o Jenkins/GitHub Actions Yamls, Bullds ECR Registry (OCI) o TerraForm +Terragrunt Provisioning (+Terraspace) o GITOPS/ArgoCD/Flux/App-of-Apps o Databases RDS/MySQL/PostgreSQL/DynamoDB... o SRE, Observability, Logging, Monitoring, Alerting, Load Balancing, High Availability RESTFul API Implementation + JWT PYTHON BASH Scripting DevSecOps o eBPF/Kernel Security o Pod Security Admission + RBAC o CIS Kubernetes Benchmark o kube-bench o AppArmor o Seccomp o gvisor o falco o tetragon o openpolicyagent o trivy