RAG Agnostic Agentic AI: The OmniScience Design Pattern

Amit SidesAmit Sides
8 min read

We all want our AI Agents to be database agnostic, query languages agnostic, and API-calling Agnostic. With the help of LLM, this is now possible. We can, as LLMs, build queries on the fly for our RAG to retrieve relevant data and then send it to LLM again and finally to the end user. In this article, I’ll elaborate on the OmniScience design pattern, its Classes, code, and architectures on the verge between AI Agents, Databases, Queries, and APIs.

The OmniScience Design Pattern

This design pattern these Classes

  1. OmniConnector: Handles database connections and query execution for RAG Agnostic

    • Supports multiple database types

    • Async connection management

    • Error handling for queries

  2. OmniQuery: Handles query generation with LLM to any Query Language

    • Query templates for different databases

    • Intention translation logic

    • Database-specific query formatting

  3. OmniChainer Chain the output of the Queries from all OmniConnection

  4. OmniScience: Main orchestrator class

    • Processes user intentions by asking OmniQuery (build Queries with LLM) and executes on OmniConnector

    • Manages concurrent query execution

    • Aggregates results

The OmniConnector Class: Encapsulate Everything Databases/Resources (Graph/Neo4j, PostgreSQL, MemoryDb/Redis, Vector/OpenSearch & Tool[API]/graphQL, API)

from typing import Optional, Any, Dict, List
import asyncio
import aiohttp
from neo4j import GraphDatabase
import psycopg2
import redis
from opensearchpy import OpenSearch
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
from openai import OpenAI
import json

class OmniConnector:
    def __init__(self, config: Dict[str, Dict[str, str]]):
        """
        Initialize connections to different databases and services

        :param config: Dictionary containing connection details for all services
        """
        self.__neo4j_driver = None
        self.__postgres_conn = None
        self.__redis_client = None
        self.__opensearch_client = None
        self.__graphql_client = None
        self.__llm_client = None

        self.__initialize_connections(config)

    def __initialize_connections(self, config: Dict[str, Dict[str, str]]) -> None:
        """
        Initialize all database connections with provided configuration
        """
        try:
            # Initialize Neo4j connection
            self.__neo4j_driver = GraphDatabase.driver(
                config['neo4j']['uri'],
                auth=(config['neo4j']['user'], config['neo4j']['password'])
            )

            # Initialize PostgreSQL connection
            self.__postgres_conn = psycopg2.connect(
                dbname=config['postgres']['dbname'],
                user=config['postgres']['user'],
                password=config['postgres']['password'],
                host=config['postgres']['host']
            )

            # Initialize Redis connection
            self.__redis_client = redis.Redis(
                host=config['redis']['host'],
                port=config['redis']['port'],
                password=config['redis']['password']
            )

            # Initialize OpenSearch connection
            self.__opensearch_client = OpenSearch(
                hosts=[config['opensearch']['host']],
                http_auth=(config['opensearch']['user'], config['opensearch']['password'])
            )

            # Initialize GraphQL client
            transport = AIOHTTPTransport(url=config['graphql']['endpoint'])
            self.__graphql_client = Client(transport=transport)

            # Initialize OpenAI client
            self.__llm_client = OpenAI(api_key=config['openai']['api_key'])

        except Exception as e:
            raise ConnectionError(f"Failed to initialize connections: {str(e)}")

    async def generate_query(self, query_type: str, description: str) -> str:
        """
        Generate database queries using LLM

        :param query_type: Type of query to generate (SQL, Cypher, SPARQL, etc.)
        :param description: Natural language description of the query
        :return: Generated query string
        """
        try:
            prompt = f"Generate a {query_type} query for the following requirement: {description}"

            response = await self.__llm_client.chat.completions.create(
                model="gpt-4",
                messages=[
                    {"role": "system", "content": f"You are an expert in {query_type} query generation."},
                    {"role": "user", "content": prompt}
                ]
            )

            return response.choices[0].message.content

        except Exception as e:
            raise Exception(f"Failed to generate {query_type} query: {str(e)}")

    async def execute_neo4j_query(self, query: str, params: Optional[Dict] = None) -> List[Dict]:
        """Execute Neo4j query"""
        try:
            with self.__neo4j_driver.session() as session:
                result = session.run(query, params or {})
                return [record.data() for record in result]
        except Exception as e:
            raise Exception(f"Neo4j query execution failed: {str(e)}")

    async def execute_postgres_query(self, query: str, params: Optional[tuple] = None) -> List[Dict]:
        """Execute PostgreSQL query"""
        try:
            with self.__postgres_conn.cursor() as cursor:
                cursor.execute(query, params or ())
                columns = [desc[0] for desc in cursor.description]
                return [dict(zip(columns, row)) for row in cursor.fetchall()]
        except Exception as e:
            raise Exception(f"PostgreSQL query execution failed: {str(e)}")

    async def execute_redis_command(self, command: str, *args) -> Any:
        """Execute Redis command"""
        try:
            return getattr(self.__redis_client, command)(*args)
        except Exception as e:
            raise Exception(f"Redis command execution failed: {str(e)}")

    async def execute_opensearch_query(self, index: str, query: Dict) -> Dict:
        """Execute OpenSearch query"""
        try:
            return self.__opensearch_client.search(index=index, body=query)
        except Exception as e:
            raise Exception(f"OpenSearch query execution failed: {str(e)}")

    async def execute_graphql_query(self, query: str, variables: Optional[Dict] = None) -> Dict:
        """Execute GraphQL query"""
        try:
            return await self.__graphql_client.execute_async(
                gql(query),
                variable_values=variables
            )
        except Exception as e:
            raise Exception(f"GraphQL query execution failed: {str(e)}")

    def __del__(self):
        """Cleanup connections"""
        if self.__neo4j_driver:
            self.__neo4j_driver.close()
        if self.__postgres_conn:
            self.__postgres_conn.close()
        if self.__redis_client:
            self.__redis_client.close()
        # OpenSearch and GraphQL clients don't require explicit cleanup

# Example usage:
async def main():
    config = {
        'neo4j': {
            'uri': 'bolt://localhost:7687',
            'user': 'neo4j',
            'password': 'password'
        },
        'postgres': {
            'host': 'localhost',
            'dbname': 'mydatabase',
            'user': 'postgres',
            'password': 'password'
        },
        'redis': {
            'host': 'localhost',
            'port': 6379,
            'password': 'password'
        },
        'opensearch': {
            'host': 'localhost:9200',
            'user': 'admin',
            'password': 'admin'
        },
        'graphql': {
            'endpoint': 'http://localhost:8080/graphql'
        },
        'openai': {
            'api_key': 'your-api-key'
        }
    }

    omni = OmniQuery(config)

    # Generate and execute a SQL query
    sql_query = await omni.generate_query(
        'SQL',
        'Find all users who made purchases in the last 30 days'
    )
    results = await omni.execute_postgres_query(sql_query)
    print(results)

    # Generate and execute a Cypher query
    cypher_query = await omni.generate_query(
        'Cypher',
        'Find all friends of friends relationships up to 3 levels deep'
    )
    results = await omni.execute_neo4j_query(cypher_query)
    print(results)

if __name__ == "__main__":
    asyncio.run(main())

OmniQuery: Ontology-Based LLM Queries Generator ( RDF, SPARQL , CYPHER, graphql, GREMLIN , JSON , SQL VECTOR )

from enum import Enum
from typing import Optional, Any, Dict, List
from dataclasses import dataclass
import json
import os
from abc import ABC, abstractmethod

class QueryType(Enum):
    RDF = "rdf"
    SPARQL = "sparql"
    CYPHER = "cypher"
    GRAPHQL = "graphql"
    GREMLIN = "gremlin"
    JSON = "json"
    SQL = "sql"
    VECTOR = "vector"

@dataclass
class QueryResult:
    query: str
    query_type: QueryType
    metadata: Optional[Dict[str, Any]] = None
    ontology_mapping: Optional[Dict[str, Any]] = None

class QueryGenerator(ABC):
    @abstractmethod
    def generate(self, query_string: str, ontology: Dict[str, Any]) -> str:
        pass

class LLMQueryGenerator:
    def __init__(self, model_name: str = "gpt-3.5-turbo"):
        self.model_name = model_name
        # Initialize your preferred LLM client here
        # self.llm_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    def generate_prompt(self, query_type: QueryType, query_string: str, ontology: Dict[str, Any]) -> str:
        """Generate appropriate prompt based on query type and ontology"""
        prompts = {
            QueryType.RDF: """
            Given the following natural language query and ontology, generate a valid RDF/OWL query.
            Use the ontology structure to properly map relationships and concepts.

            Query: {query}
            Ontology: {ontology}

            Generate RDF/OWL query that:
            1. Uses proper RDF/OWL syntax
            2. Maps to the given ontology concepts
            3. Maintains semantic relationships
            4. Includes relevant prefixes
            """,

            QueryType.SPARQL: """
            Convert the following natural language query to SPARQL using the provided ontology.

            Query: {query}
            Ontology: {ontology}

            Requirements:
            1. Use proper SPARQL syntax
            2. Include necessary PREFIX declarations
            3. Map to ontology concepts
            4. Optimize for performance
            """,

            QueryType.CYPHER: """
            Transform this natural language query into an openCypher query using the ontology mapping.

            Query: {query}
            Ontology: {ontology}

            The Cypher query should:
            1. Use proper Cypher syntax
            2. Include node labels from ontology
            3. Map relationships correctly
            4. Include relevant properties
            """,

            # Add similar prompts for other query types...
        }

        return prompts.get(query_type, "").format(
            query=query_string,
            ontology=json.dumps(ontology, indent=2)
        )

class OmniQuery:
    def __init__(self, query_string: str, ontology_path: Optional[str] = None):
        self.query_string = query_string.strip()
        self.ontology = self._load_ontology(ontology_path)
        self.llm_generator = LLMQueryGenerator()

    def _load_ontology(self, ontology_path: Optional[str]) -> Dict[str, Any]:
        """Load ontology from file or use default"""
        if ontology_path and os.path.exists(ontology_path):
            with open(ontology_path, 'r') as f:
                return json.load(f)
        return self._get_default_ontology()

    def _get_default_ontology(self) -> Dict[str, Any]:
        """Return default ontology structure"""
        return {
            "classes": {
                "Person": {
                    "properties": ["name", "age", "email"],
                    "relationships": ["knows", "worksFor"]
                },
                "Organization": {
                    "properties": ["name", "location"],
                    "relationships": ["employs", "partnersWith"]
                }
            },
            "relationships": {
                "knows": {
                    "domain": "Person",
                    "range": "Person"
                },
                "worksFor": {
                    "domain": "Person",
                    "range": "Organization"
                }
            }
        }

    def _generate_query_with_llm(self, query_type: QueryType) -> str:
        """Generate query using LLM"""
        prompt = self.llm_generator.generate_prompt(
            query_type,
            self.query_string,
            self.ontology
        )

        # Here you would call your LLM service
        # response = self.llm_client.generate(prompt)
        # return response.text

        # Placeholder for demonstration
        return f"Generated {query_type.value} query based on prompt: {prompt[:100]}..."

    def to_rdf(self) -> QueryResult:
        """Convert to RDF/OWL format using LLM"""
        query = self._generate_query_with_llm(QueryType.RDF)
        return QueryResult(
            query=query,
            query_type=QueryType.RDF,
            ontology_mapping=self._get_ontology_mapping(QueryType.RDF)
        )

    def to_sparql(self) -> QueryResult:
        """Convert to SPARQL query using LLM"""
        query = self._generate_query_with_llm(QueryType.SPARQL)
        return QueryResult(
            query=query,
            query_type=QueryType.SPARQL,
            ontology_mapping=self._get_ontology_mapping(QueryType.SPARQL)
        )

    def _get_ontology_mapping(self, query_type: QueryType) -> Dict[str, Any]:
        """Get ontology mapping for specific query type"""
        # Implementation would depend on your ontology structure
        return {
            "mapped_concepts": [],
            "relationships": [],
            "properties": []
        }

    def validate_query(self, query: str, query_type: QueryType) -> bool:
        """Validate generated query against ontology"""
        # Implement validation logic
        return True

    def optimize_query(self, query: str, query_type: QueryType) -> str:
        """Optimize generated query"""
        # Implement optimization logic
        return query

    def convert(self, target_type: Optional[QueryType] = None) -> QueryResult:
        """Convert query to specified type or auto-detect"""
        if target_type is None:
            target_type = self.auto_detect_type()

        conversion_methods = {
            QueryType.RDF: self.to_rdf,
            QueryType.SPARQL: self.to_sparql,
            # Add other conversion methods...
        }

        result = conversion_methods[target_type]()

        # Validate and optimize
        if self.validate_query(result.query, target_type):
            result.query = self.optimize_query(result.query, target_type)

        return result

OmniChain: Chain Everything

class OmniChain:
    def __init__(self, dataFromQueries):
        """
        Initialize the chaining process with an LLM and a list of templates.

        :param llm: The language model to use.
        :param templates: A list of PromptTemplate objects defining the chain.
        """
        self.llm = llm
        self.data = dataFromQueries
        self.chains = self._create_chains()

    def _create_chains(self):
        """Creates a sequence of LLMChains based on the provided templates."""
        chains = []
        for i, data in enumerate(self.data):
            chains.append(LLMChain(llm=self.llm, prompt=data))
        return chains

    def run(self, docs):
        """
        Runs the chain sequentially on the input data.

        :param docs: List of text data to be processed sequentially.
        :return: Final output after processing through all chains.
        """
        previous_output = docs
        for chain in self.chains:
            previous_output = chain.run(previous_output)
        return previous_output

OmniScience: Ask Me Anything

Now that we have OmniDatabase and OmniQuery encapsulated you can retrieve all the relevant pieces chain them together & send them to LLM

from langchain.chains import LLMChain, SequentialChain
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI

class OmniScience:
    """Main class that orchestrates the entire RAG system"""

    def __init__(self, UserIntention):
        self.omniConnector = OmniConnector()
        self.OmniQuery = OmniQuery(UserIntention)
        self.OmniChain = OmniChain(queryAnswer)

Summery

Using LLMs to build the Queries to our multi-database environment with pre-defined ontology as guidance allows our AI agent infinite capabilities when it comes to query/retrieval of relevant information based on user Intentionality. Dynamic Queries and retrieval based on LLM output of the User Intentions has limitless capabilities. The design pattern can be used to do just any kind of RAG/GraphRAG because you can retrieve anything from anywhere with any Query Language.

0
Subscribe to my newsletter

Read articles from Amit Sides directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Amit Sides
Amit Sides

Amit Sides is a Backend Developer, DevOps Expert, DevSecOps & MLOPS GITHUB https://github.com/amitsides Technology Stack o AWS-EKS/AKS/GKE / Cloud-Native / Multi-Cloud o Microservices + MSK + SQS + KMS o Linux System Administrator / Ansible o Dockerfiles o Kubernetes Clusters + Scalability (Karpanter/KEDA) o K8s Services Controllers Ingresses, Nginx, Load Balancers, Istio, CNI, Cillium o Jenkins/GitHub Actions Yamls, Bullds ECR Registry (OCI) o TerraForm +Terragrunt Provisioning (+Terraspace) o GITOPS/ArgoCD/Flux/App-of-Apps o Databases RDS/MySQL/PostgreSQL/DynamoDB... o SRE, Observability, Logging, Monitoring, Alerting, Load Balancing, High Availability RESTFul API Implementation + JWT PYTHON BASH Scripting DevSecOps o eBPF/Kernel Security o Pod Security Admission + RBAC o CIS Kubernetes Benchmark o kube-bench o AppArmor o Seccomp o gvisor o falco o tetragon o openpolicyagent o trivy