Building a Multi-Agent System for Enterprise Customer Support with Lyzr and Qdrant

M Quamer NasimM Quamer Nasim
32 min read

Every buyer needs prompt, precise, and personalized assistance in real-time. Once upon a time, the customer experience was handled manually, by sales folks across the counter or over the phone. But this was not a scalable system. So in came traditional chatbots, which brought in consistency across interactions but now struggle with conversational memory and complex, multi-turn interactions.

Multi-agent systems can offer a solution here. These systems emulate human-like collaboration, with various specialized agents working cohesively to achieve objectives. In this blog post, we will construct an intelligent customer support agent utilizing Lyzr for orchestration and Qdrant for data storage and retrieval, creating a scalable solution that enables businesses to offer truly context-aware, real-time customer support.

Problem Statement & Solution

The goal is to create a customer support agent capable of:

  • Understanding conversation context across multiple turns.

  • Retrieving relevant information from customer data, helpdesk logs, FAQs, policies, and handbooks.

  • Classifying and routing issues appropriately.

  • Analyzing sentiment to assess customer satisfaction.

  • Escalating issues when necessary.

  • Providing empathetic, helpful responses.

This tutorial will guide companies in building a complete end-to-end customer support system. While this is a demo, it can be extended into a full-fledged product, as it incorporates all the complex components typically found in a production-ready system. Additionally, this guide will help developers understand how to build Multi-Agent Systems (MAS). The system offers developers a scalable framework for creating context-aware, enterprise-grade support solutions, ultimately improving customer satisfaction and operational efficiency.

High-Level System Architecture

Our multi-agent system comprises several components:

  • Agents: Specialized entities handling tasks like tenant resolution, customer info extraction, ticket retrieval, FAQ/policy/handbook retrieval, routing, sentiment analysis, escalation, and response generation.

  • Orchestrator (Lyzr): Manages agent lifecycles, facilitates inter-agent communication, and orchestrates the workflow.

  • Search/Retrieval (Qdrant): A vector database for storing and retrieving multi-index data using hybrid search.

  • Context Store: Maintains conversation history and contextual data.

Qdrant’s Role

Qdrant serves as the central vector store, enabling:

  • Multi-Index Data Storage: Stores customer data, helpdesk logs, and knowledge base content, indexed by different keywords for faster search.

  • Fast Hybrid Search: Combines dense and sparse vectors for accurate retrieval.

  • Payload-Indexed Metadata Filtering: Filters results by tenant, customer, or source type.

  • Context Storage: Persists agent memory for multi-turn interactions.

Lyzr’s Role

Lyzr orchestrates the system by:

  • Agent Lifecycle Management: Creates and manages agent instances.

  • Inter-Agent Communication: Passes context between agents.

  • Real-Time Query Routing: Directs queries and fuses results.

Lyzr provides two solutions: a no-code platform and a low-code platform. With the no-code platform, users can build agents using the provided UI elements, though this approach offers less control over how the agents are defined. In contrast, the low-code Lyzr library, Lyzr Automata, provides a cleaner implementation. It retains the simplicity of low-code development while giving users greater flexibility and control in defining their agents.

Prerequisites

Before implementation, let’s set up the following:

  • Lyzr: Install the Lyzr automata library.
# using uv as the package manger
# this will install all the dependencies
curl -Ls https://astral.sh/uv/install.sh | bash

Let’s create the dependency file project.toml:

[project]
name = "lyzr-qdrant-multi-agent-system"
version = "0.1.0"
description = "An enterprise-grade, multi-agent customer support system built with the Lyzr agentic framework and Qdrant vector database."
license = {text = "MIT"}
readme = "README.md"
requires-python = "==3.12.0"
dependencies = [
    "lyzr-automata",
    "qdrant-client",
    "fastembed>=0.2.0",
    "pandas",
    "transformers",
    "torch",
    "accelerate",
    "bitsandbytes",
    "python-dotenv",
    "google-generativeai",
    "ipykernel>=6.30.0",
    "ipywidgets>=8.1.7",
    "bs4>=0.0.2",
    "llama-index-embeddings-huggingface>=0.6.0",
    "torchvision>=0.22.1",
]

[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"

Let’s install the dependency.

uv lock
uv sync
  • Qdrant: Run a Qdrant instance locally using Docker.
docker run -p 6333:6333 -p 6334:6334 -v $(pwd)/qdrant_storage:/qdrant/storage:z qdrant/qdrant

For more details on setting up Qdrant, visit the Qdrant Documentation.

Dataset: Use sample data for tenants “ecom” and “fintech” (e.g., CRM records, helpdesk logs, FAQs). Download from the link below or prepare your own. Link to the dataset I have used: https://github.com/quamernasim/multi-agentic-customer-support-lyzr-qdrant/tree/main/data

This is how the dataset structure looks like, with multiple tenants and files serving as the knowledge base.

├── ecom
│   ├── crm_records.csv
│   ├── helpdesk_logs.csv
│   ├── images
│   │   ├── ORD-0001.jpg
│   │   ├── ORD-0002.jpg
│   │   ├── ORD-0003.jpg
│   │   ├── ORD-0004.jpg
│   │   ├── ORD-0005.jpg
│   │   ├── ......
│   ├── knowledge_base
│   │   ├── faqs.json
│   │   ├── handbook.json
│   │   └── policy.json
│   └── orders.csv
└── fintech
    ├── crm_records.csv
    ├── helpdesk_logs.csv
    └── knowledge_base
        ├── faqs.json
        ├── handbook.json
        └── policy.json

It also contains multi-modal data scraped from Amazon, with the corresponding script available in the GitHub repository.

Example of CRM records data:

customer_id,name,email,phone,join_date,subscription_plan,status,last_interaction_date,lifetime_value,tags
CUST-001,John Doe,john.doe@example.com,+1-555-1234,2021-03-15,Premium,Active,2025-07-12,12450.50,"high-value,loyal"
CUST-002,Maria Singh,maria.singh@example.com,+91-98765-43210,2022-06-20,Basic,Active,2025-06-30,300.00,"churn-risk"
CUST-003,Liam Chen,liam.chen@example.com,+44-7700-900123,2020-11-08,Enterprise,Suspended,2025-04-19,24200.00,"high-value,escalation"
CUST-004,Sophia Brown,sophia.brown@example.com,+1-555-9876,2023-01-12,Free,Active,2025-07-18,50.00,"new-user"

Example of Helpdesk logs data:

ticket_id,customer_id,opened_date,closed_date,issue_summary,status,priority,resolution_notes,category,assigned_agent,resolution_time_hours
TCK-1001,CUST-001,2025-07-10,2025-07-11,"Unable to login to account",Closed,High,"Reset password and cleared cache",Technical,Agent-23,4.5
TCK-1002,CUST-002,2025-07-05,2025-07-06,"Incorrect billing amount",Closed,Medium,"Refund issued and billing cycle corrected",Billing,Agent-11,8.2
TCK-1003,CUST-003,2025-07-01,,"Subscription renewal failed",Open,Urgent,,"Billing",Agent-07,
TCK-1004,CUST-004,2025-07-09,2025-07-09,"Two-factor authentication not working",Closed,High,"Re-synced device and verified settings",Technical,Agent-15,3.0

Example of Order data:

order_id,customer_id,order_date,product_name,product_category,quantity,unit_price,total_amount,payment_status,delivery_status,payment_method,shipping_address
ORD-0001,CUST-001,2025-07-10,Noise Cancelling Headphones,Electronics,1,199.99,199.99,Paid,Delivered,Credit Card,"New York, USA"
ORD-0002,CUST-004,2025-07-12,Fitness Tracker,Wearables,2,79.99,159.98,Paid,Delivered,PayPal,"Los Angeles, USA"
ORD-0003,CUST-015,2025-07-11,Organic Cotton T-Shirt,Clothing,3,25.0,75.0,Paid,Shipped,Credit Card,"Hanoi, Vietnam"
ORD-0004,CUST-010,2025-07-09,Wireless Mouse,Electronics,1,29.99,29.99,Paid,Delivered,Wallet,"Seoul, South Korea"
ORD-0005,CUST-022,2025-07-08,Laptop Stand,Accessories,1,45.5,45.5,Paid,Delivered,Bank Transfer,"Chicago, USA"

Example of FAQs for knowledge base:

  {
    "faq_id": "FAQ-001",
    "question": "How do I track my order?",
    "answer": "You can track your order using the 'Track Order' option in your account dashboard or via the tracking link sent in your confirmation email.",
    "tags": ["orders"]
  },
  {
    "faq_id": "FAQ-002",
    "question": "What should I do if my order is delayed?",
    "answer": "If your order is delayed, please check the tracking status. If there is no update for 48 hours, contact our support team for assistance.",
    "tags": ["delivery"]
  },

Example of Handbook for knowledge base:

  {
    "kb_id": "KB-008",
    "title": "How to Update Delivery Address",
    "content": "Delivery addresses can be updated in 'My Account' > 'Saved Addresses'. For active orders, address changes are allowed only if the order has not yet shipped. To update an address for an unshipped order, visit 'My Orders', select the order, and click 'Edit Address'. Provide accurate and complete details, including landmarks and contact numbers, to avoid delivery issues. For shipped orders, address changes are not possible, but you can request rescheduling through the courier’s tracking page. Maintaining updated address records ensures faster checkouts and minimizes failed delivery attempts.",
    "tags": ["shipping"]
  },
  {
    "kb_id": "KB-009",
    "title": "Understanding Store Credit Usage",
    "content": "Store credits are issued for eligible refunds or promotional campaigns and can be used to pay for future purchases. At checkout, available store credits are automatically applied, reducing your payable amount. If your order value exceeds your store credit balance, you can pay the difference using other payment methods. Store credits cannot be withdrawn or transferred between accounts and expire after 12 months. If store credits fail to apply automatically, ensure you're logged into the correct account. Credits used on canceled orders will be refunded back to your store credit balance.",
    "tags": ["payments"]
  },

Example of Policy for knowledge base:


  {
    "policy_id": "POL-001",
    "title": "Return and Exchange Policy",
    "description": "Our Return and Exchange Policy allows customers to return or exchange eligible items within 7 days of delivery. To qualify, the item must be unused, unwashed, and in its original packaging with all tags intact. For electronics and high-value items, the product should be sealed and unopened. Once a return request is initiated, our logistics team will schedule a pickup within 2-4 working days. After inspection of the returned item, we will either issue a replacement or process a refund as per the customer’s request. Please note that certain product categories such as personal care, lingerie, customized products, and perishable goods are non-returnable unless received damaged or defective. Items damaged due to misuse or mishandling will not be accepted. Our exchange option is limited to size and color variations of the same product, subject to stock availability. If an exchange is not possible, a refund will be issued as per our Refund Policy. The company reserves the right to deny any return if the returned product does not meet our quality guidelines.",
    "tags": ["returns", "refunds"]
  },
  {
    "policy_id": "POL-002",
    "title": "Refund Policy",
    "description": "Refunds are processed only after successful inspection of returned products or in cases of failed or canceled orders. Refunds will be credited to the original payment method within 5-7 business days, depending on your bank or payment provider. If the payment was made using COD, refunds will be processed through a bank transfer or store credit. For partial refunds due to damaged items or shipping discrepancies, the refunded amount will correspond to the value of the affected product(s) only. In rare cases, if refund processing is delayed due to bank or payment gateway issues, we request customers to allow 2 additional business days before escalation. Refunds for promotional discounts or coupon-based purchases will exclude the coupon value unless explicitly mentioned. If a refund fails due to incorrect payment details provided by the customer, we may require verification documents for re-processing. Refund requests related to fraudulent activity, suspicious returns, or excessive returns from a single account may be reviewed and declined as per our Fraud Prevention Guidelines.",
    "tags": ["refunds"]
  },

Example of product image:

Implementation Steps

Let’s walk through the code to build this system step-by-step.

Step 1: Setting Up Qdrant Collections

In src/qdrant_util/setup_qdrant.py, we create collections for user data, knowledge base, and semantic cache.

from qdrant_client import QdrantClient, models

DENSE_EMBEDDING_SIZE = 384
IMAGE_EMBEDDING_SIZE = 512
client = QdrantClient(host="localhost", port=6333)

def create_or_recreate_collection(
    name: str, 
    indexes: dict, 
    use_sparse: bool = True, 
    use_image: bool = False,
    use_hnsw_optimization: bool = False,
    distance_metric: models.Distance = models.Distance.COSINE
):
    """Creates a new Qdrant collection with advanced options."""
    try:
        client.get_collection(collection_name=name)
        print(f"Collection '{name}' already exists. Recreating it for a clean slate.")
        client.delete_collection(collection_name=name)
    except Exception:
        pass

    print(f"Creating collection '{name}'...")

    hnsw_config = None
    if use_hnsw_optimization:
        print(f"Applying HNSW optimization for collection '{name}'.")
        hnsw_config = models.HnswConfigDiff(
            payload_m=16,
            m=0,
            on_disk=True,
        )

    sparse_vectors_config = None
    if use_sparse:
        sparse_vectors_config = {
            "sparse": models.SparseVectorParams(
                index=models.SparseIndexParams(
                    on_disk=True
                )
            )
        }

    vectors_config={
        "dense": models.VectorParams(
            size=DENSE_EMBEDDING_SIZE,
            distance=distance_metric,
            on_disk=True,
        ),
    }
    if use_image:
        vectors_config['image'] = models.VectorParams(
            size=IMAGE_EMBEDDING_SIZE,
            distance=distance_metric,
            on_disk=True,   
        )

    client.create_collection(
        collection_name=name,
        vectors_config=vectors_config,
        sparse_vectors_config=sparse_vectors_config,
        hnsw_config=hnsw_config,
        on_disk_payload=True,
    )

    print(f"Creating payload indexes for '{name}'...")
    for field, field_type in indexes.items():
        client.create_payload_index(name, field, field_schema=field_type)
    print(f"Collection '{name}' created successfully.")


if __name__ == "__main__":
    user_data_indexes = {
        "tenant_id": models.KeywordIndexParams(type='keyword', is_tenant=True, on_disk=True),
        "customer_id": models.KeywordIndexParams(type='keyword', is_tenant=True, on_disk=True)
    }

    kb_indexes = {
        "tenant_id": models.KeywordIndexParams(type='keyword', is_tenant=True, on_disk=True),
        "tags": models.KeywordIndexParams(type='keyword', on_disk=True),
        "source_type": models.KeywordIndexParams(type='keyword', on_disk=True)
    }

    create_or_recreate_collection("user_data", indexes=user_data_indexes, use_hnsw_optimization=True)
    create_or_recreate_collection("knowledge_base", indexes=kb_indexes, use_hnsw_optimization=True)


    cache_indexes = {
        "tenant_id": models.KeywordIndexParams(type='keyword', is_tenant=True, on_disk=True),
        "customer_id": models.KeywordIndexParams(type='keyword', is_tenant=True, on_disk=True)
    }
    create_or_recreate_collection("semantic_cache", indexes=cache_indexes, use_sparse=False, distance_metric=models.Distance.EUCLID)

    order_indexes = {
        "tenant_id": models.KeywordIndexParams(type='keyword', is_tenant=True, on_disk=True),
        "customer_id": models.KeywordIndexParams(type='keyword', is_tenant=True, on_disk=True),
        "order_id": models.KeywordIndexParams(type='keyword', is_tenant=True, on_disk=True)
    }
    create_or_recreate_collection("orders", indexes=order_indexes, use_image=True)
    print("Qdrant setup complete.")

This script serves as the Qdrant collection setup utility for the multiagent system.

  • Connection & Initialization — Establishes a connection to a Qdrant instance.

  • Configurable Collection Creation — Recreates collections from scratch, supporting dense, sparse, and optional image embeddings; includes HNSW optimization and configurable distance metrics (COSINE/EUCLID).

  • Payload Indexing — Builds metadata indexes (e.g., tenant_id, customer_id, tags, order_id) for fast, filter-based searches.

  • Specialized Collections:
    – user_data → Tenant & customer information
    – user_data → Tenant & customer information
    – knowledge_base → Tagged, source-specific knowledge base documents
    – semantic_cache → Previous query–response pairs (sparse disabled, EUCLID distance)
    – orders → Order details with image embeddings for visual search

Step 2: Ingesting Data

In src/qdrant_util/ingest_data.py, we process and upload data to Qdrant.

import os
import uuid
import json
import pandas as pd
from qdrant_client import QdrantClient
from tqdm import tqdm
from qdrant_client.models import PointStruct, SparseVector
from fastembed import SparseTextEmbedding, TextEmbedding, ImageEmbedding

DENSE_EMBEDDING_MODEL_NAME = "BAAI/bge-small-en-v1.5"
SPARSE_EMBEDDING_MODEL_NAME = "prithivida/Splade_PP_en_v1"
IMAGE_EMBEDDING_MODEL_NAME = "Qdrant/clip-ViT-B-32-vision"
DENSE_EMBEDDING_MODEL = TextEmbedding(model_name=DENSE_EMBEDDING_MODEL_NAME)
IMAGE_EMBEDDING_MODEL = ImageEmbedding(model_name=IMAGE_EMBEDDING_MODEL_NAME)
SPARSE_EMBEDDING_MODEL = SparseTextEmbedding(model_name=SPARSE_EMBEDDING_MODEL_NAME)
client = QdrantClient(host="localhost", port=6333)

def process_unstructured_files(directory_path, tenant_id, points_list):
    """Dynamically reads all JSON files from a given directory."""
    for filename in os.listdir(directory_path):
        if filename.endswith(".json"):
            filepath = os.path.join(directory_path, filename)
            source_name = filename.split('.')[0]

            with open(filepath, 'r') as f:
                data = json.load(f)

                for item in data:
                    category = item.get(
                        "question", item.get("title", {})
                    )
                    content = item.get(
                        "answer", item.get(
                            "content", item.get("description", "")
                        )
                    )
                    tags = item.get("tags", {})

                    if not content:
                        continue

                    text_to_embed = content
                    # THIS CAN ONLY BE QUESTION/TITLE/POLICY_TYPE
                    if source_name == 'faqs':
                        text_to_embed = f"Question: {category}\nAnswer: {content}"
                    elif source_name == 'handbook':
                        text_to_embed = f"Title: {category}\nContent: {content}"
                    elif source_name == 'policy':
                        text_to_embed = f"Policy Type: {category}\nPolicy Description: {content}"

                    payload = {
                        "tenant_id": tenant_id,
                        "source_type": source_name,
                        "tags": tags,
                        "content": text_to_embed,
                    }
                    points_list.append((text_to_embed, payload))

def process_multimodal_files(data_path, tenant_id, points_list):
    """Dynamically reads all JSON files from a given directory."""
    order_path = f"{data_path}/{tenant_id}/orders.csv"
    if os.path.exists(order_path):
        order_df = pd.read_csv(order_path)

        for _, row in order_df.iterrows():
            text_to_embed = f"Product Name: {row['product_name']}, Product Category: {row['product_category']}"
            image_to_embed = f"{data_path}/{tenant_id}/images/{row['order_id']}.jpg"
            payload = {
                "tenant_id": tenant_id, 
                "source_type": "orders", 
                **row.to_dict(), 
                "text_embeded": text_to_embed, 
                "image_embedded": image_to_embed
            }
            points_list.append((text_to_embed, image_to_embed, payload))

def upsert_in_batch(text, payloads, collection_name, batch_size, image=None):
    dense = list(DENSE_EMBEDDING_MODEL.embed(text))
    sparse = list(SPARSE_EMBEDDING_MODEL.embed(text))
    if image:
        image_embeds = list(IMAGE_EMBEDDING_MODEL.embed(image))

    total = len(text)
    for i in tqdm(range(0, total, batch_size)):
        batch_dense_embeddings = dense[i:i + batch_size]
        batch_sparse_embeddings = sparse[i:i + batch_size]
        batch_payloads = payloads[i:i + batch_size]

        if image:
            batch_image_embeddings = image_embeds[i:i + batch_size]
            points = [
                PointStruct(
                    id=str(uuid.uuid4()),
                    vector={
                        "dense": dense_embeddings,
                        "sparse": SparseVector(indices=sparse_embeddings.indices, values=sparse_embeddings.values),
                        "image": image_embeddings,
                    },
                    payload=payload
                )
                for dense_embeddings, sparse_embeddings, image_embeddings, payload in zip(
                    batch_dense_embeddings, batch_sparse_embeddings, batch_image_embeddings, batch_payloads
                )
            ]
        else:
            points = [
                PointStruct(
                    id=str(uuid.uuid4()),
                    vector={
                        "dense": dense_embeddings,
                        "sparse": SparseVector(indices=sparse_embeddings.indices, values=sparse_embeddings.values)
                    },
                    payload=payload
                )
                for dense_embeddings, sparse_embeddings, payload in zip(batch_dense_embeddings, batch_sparse_embeddings, batch_payloads)
            ]

        client.upsert(
            collection_name=collection_name,
            points=points,
            wait=True
        )

def ingest_data(data_path, batch_size = 64):
    """Processes all data sources and uploads them to their respective collections."""
    user_data_points = []
    kb_points = []
    order_points = []

    tenants = ["ecom", "fintech"]
    for tenant in tenants:
        print(f"\n--- Processing data for tenant: {tenant} ---")

        crm_df = pd.read_csv(f"{data_path}/{tenant}/crm_records.csv")
        for _, row in crm_df.iterrows():
            text_to_embed = f"Customer: {row['name']}, Email: {row['email']}"
            payload = {"tenant_id": tenant, "source_type": "crm", **row.to_dict(), "text_embeded": text_to_embed}
            user_data_points.append((text_to_embed, payload))

        helpdesk_df = pd.read_csv(f"{data_path}/{tenant}/helpdesk_logs.csv")
        for _, row in helpdesk_df.iterrows():
            text_to_embed = f"Ticket: {row['issue_summary']}, Status: {row['status']}"
            payload = {"tenant_id": tenant, "source_type": "helpdesk", **row.to_dict(), "text_embeded": text_to_embed}
            user_data_points.append((text_to_embed, payload))

        kb_path = f"{data_path}/{tenant}/knowledge_base"
        process_unstructured_files(kb_path, tenant, kb_points)
        process_multimodal_files(data_path, tenant, order_points)

    user_data_texts, user_data_payloads = zip(*user_data_points)
    upsert_in_batch(user_data_texts, user_data_payloads, "user_data", batch_size)
    print(f"\nIngested {len(user_data_points)} points into 'user_data' collection.")

    kb_texts, kb_payloads = zip(*kb_points)
    upsert_in_batch(kb_texts, kb_payloads, "knowledge_base", batch_size)
    print(f"Ingested {len(kb_points)} points into 'knowledge_base' collection.")

    order_texts, order_image, order_payload = zip(*order_points)
    upsert_in_batch(order_texts, order_payload, "orders", batch_size, image=order_image)
    print(f"Ingested {len(order_points)} points into 'orders' collection.")

if __name__ == "__main__":
    ingest_data(data_path = 'data')

This script is the data ingestion pipeline for the multiagent system, responsible for embedding and storing diverse data in Qdrant for fast retrieval.

  • Model Setup — Initializes dense (BAAI/bge-small-en-v1.5), sparse (prithivida/Splade_PP_en_v1), and image (Qdrant/clip-ViT-B-32-vision) embeddings.

  • Knowledge Base Processing — Reads tenant FAQs, policies, and handbooks (JSON), formats them with contextual labels, and prepares them for embedding.

  • Order Data Processing — Pairs order descriptions from orders.csv with corresponding product images for multimodal search.

  • Tenant Data Ingestion — Loads CRM and helpdesk logs into user_data, KB entries into knowledge_base, and product data into orders.

  • Batch Embedding & Upload — Generates embeddings and upserts them into the appropriate Qdrant collections.

Step 3: Defining Agents

In src/agents_util/agents.py, we define agents using Lyzr’s Agent class.

from lyzr_automata import Agent

TenantResolverAgent = Agent(
    role="TenantResolver",
    prompt_persona=(
        "You are the Tenant Resolver. Your task is to classify the customer query "
        "into one of the two categories: ecom, fintech.\n\n"
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"TenantResolver\",\n"
        "  \"response\": {\n"
        "    \"tenant_type\": \"<one of: ecom, fintech>\",\n"
        "    \"concise_reason\": \"<brief reason for classification>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always use exactly the above structure.\n"
        "- Do not include any additional text or explanation outside the JSON.\n"
    )
)

OrderIDExtractorAgent = Agent(
    role="OrderIDExtractor",
    prompt_persona=(
        "You are the Order ID Extractor. Your task is to extractor the order id from the query. "
        "The example of order id is this: ORD-0017, ORD-0149. It starts with ORD, a hyphen and 4 digit number.\n\n"
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"OrderIDExtractor\",\n"
        "  \"response\": {\n"
        "    \"order_id\": \"<extracted order id>\",\n"
        "    \"concise_reason\": \"<brief reason for extraction>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always use exactly the above structure.\n"
        "- Do not include any additional text or explanation outside the JSON.\n"
    )
)

ImagePathExtractorAgent = Agent(
    role="ImagePathExtractor",
    prompt_persona=(
        "You are the Image Path Extractor. Your task is to extractor the image path from the query. "
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"ImagePathExtractor\",\n"
        "  \"response\": {\n"
        "    \"image_path\": \"<extracted image path>\",\n"
        "    \"concise_reason\": \"<brief reason for extraction>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always use exactly the above structure.\n"
        "- Do not include any additional text or explanation outside the JSON.\n"
    )
)

OrderInfoExtractorAgent = Agent(
    role="OrderInfoExtractor",
    prompt_persona=(
        "You are the Order Info Extractor. Your task is to extract the main info"
        "from the full order info that will help in better answering the user query\n\n"
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"OrderInfoExtractor\",\n"
        "  \"response\": {\n"
        "    \"order_info\": \"<extracted order info>\",\n"
        "    \"concise_reason\": \"<brief reason for extraction>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always use exactly the above structure.\n"
        "- Do not include any additional text or explanation outside the JSON.\n"
    )
)



CustomerInfoExtractorAgent = Agent(
    role="CustomerInfoExtractor",
    prompt_persona=(
        "You are the Customer Info Extractor. Your task is to extract the Customer info"
        "related to user query and the provided full information about the Customer\n\n"
        "You're supposed to return only those entries of customer that are going to help in better answering the user query"
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"CustomerInfoExtractor\",\n"
        "  \"response\": {\n"
        "    \"customer_info\": \"<customer info>\",\n"
        "    \"concise_reason\": \"<brief reason for filtering>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always use exactly the above structure.\n"
        "- Do not include any additional text or explanation outside the JSON.\n"
    )
)

TicketExtractorAgent = Agent(
    role="TicketExtractor",
    prompt_persona=(
        "You are the Ticket Extractor. Your task is to extract the relevant existing ticket"
        "related to user query and the provided list of relevant tickets\n\n"
        "You're supposed to return only those entries of tickets that are going to help in better answering the user query"
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"TicketExtractor\",\n"
        "  \"response\": {\n"
        "    \"related_tickets\": \"<related filtered tickets>\",\n"
        "    \"concise_reason\": \"<brief reason for filtering>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always use exactly the above structure.\n"
        "- Do not include any additional text or explanation outside the JSON.\n"
    )
)

FAQExtractorAgent = Agent(
    role="FAQExtractor",
    prompt_persona=(
        "You are the FAQ Extractor. Your task is to extract the FAQs"
        "related to user query and the provided list of relevant FAQs\n\n"
        "You're supposed to return only those FAQs that are going to help in better answering the user query"
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"FAQExtractor\",\n"
        "  \"response\": {\n"
        "    \"related_faqs\": \"<related filtered faqs>\",\n"
        "    \"concise_reason\": \"<brief reason for filtering>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always use exactly the above structure.\n"
        "- Do not include any additional text or explanation outside the JSON.\n"
    )
)

PolicyExtractorAgent = Agent(
    role="PolicyExtractor",
    prompt_persona=(
        "You are the Policy Extractor. Your task is to extract the Policy"
        "related to user query and the provided list of relevant Policy\n\n"
        "You're supposed to return only those Policy that are going to help in better answering the user query"
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"PolicyExtractor\",\n"
        "  \"response\": {\n"
        "    \"related_policies\": \"<related filtered policy>\",\n"
        "    \"concise_reason\": \"<brief reason for filtering>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always use exactly the above structure.\n"
        "- Do not include any additional text or explanation outside the JSON.\n"
    )
)

HandbookExtractorAgent = Agent(
    role="HandbookExtractor",
    prompt_persona=(
        "You are the Handbook Extractor. Your task is to extract the Handbook"
        "related to user query and the provided list of relevant Handbook\n\n"
        "You're supposed to return only those Handbook that are going to help in better answering the user query"
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"HandbookExtractor\",\n"
        "  \"response\": {\n"
        "    \"related_handbooks\": \"<related filtered handbook>\",\n"
        "    \"concise_reason\": \"<brief reason for filtering>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always use exactly the above structure.\n"
        "- Do not include any additional text or explanation outside the JSON.\n"
    )
)


ProductQualityCheckAgent = Agent(
    role="ProductQualityChecker",
    prompt_persona=(
        "You are the Product Quality Checker. Your task is to compare a user-uploaded product image information "
        "against the original reference image information to determine if the uploaded product is damaged or a bit different."
        "Retrun is not acceptable if the product is damaged. "
        "The max score possible is 0.5. Score of 0.5 means perfect match\n\n"
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"ProductQualityChecker\",\n"
        "  \"response\": {\n"
        "    \"is_same_product\": \"<one of: yes, no>\",\n"
        "    \"defect_detected\": \"<one of: yes, no>\",\n"
        "    \"is_returnable\": \"<one of: yes, no>\",\n"
        "    \"concise_reason\": \"<brief justification for your conclusions>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always use exactly the above structure.\n"
        "- Do not include any additional text or explanation outside the JSON.\n"
    )
)

ReturnValidationAgent = Agent(
    role="ReturnValidator",
    prompt_persona=(
        "You are the Return Item Validator. Your task is to check a user-uploaded image of a product being returned "
        "to verify if it is actually the same product as per the orginal product information. the retrun is acceptable is it is same product but damaged or a bit different."
        "Slight mismatch will work but not too much.\n\n"
        "The max score possible is 0.5. Score of 0.5 means perfect match\n\n"
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"ReturnDefectValidator\",\n"
        "  \"response\": {\n"
        "    \"is_same_product\": \"<one of: yes, no>\",\n"
        "    \"is_returnable\": \"<one of: yes, no>\",\n"
        "    \"concise_reason\": \"<brief reason of validation>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always use exactly the above structure.\n"
        "- Do not include any additional text or explanation outside the JSON.\n"
    )
)



RouterAgent = Agent(
    role="Router",
    prompt_persona=(
        "You are the Support Router. Your task is to classify the customer issue "
        "into one of the following categories: billing, technical, general.\n\n"
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"Router\",\n"
        "  \"response\": {\n"
        "    \"issue_type\": \"<one of: billing, technical, general>\",\n"
        "    \"concise_reason\": \"<brief reason for classification>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always use exactly the above structure.\n"
        "- Do not include any additional text or explanation outside the JSON.\n"
    )
)

SentimentAgent = Agent(
    role="SentimentAnalyzer",
    prompt_persona=(
        "You are a sentiment classifier. Your task is to analyze the sentiment of a given text "
        "and classify it as one of the following categories: Positive, Neutral, or Negative.\n\n"
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"SentimentAnalyzer\",\n"
        "  \"response\": {\n"
        "    \"sentiment\": \"<one of: Positive, Neutral, Negative>\",\n"
        "    \"concise_reason\": \"<brief reason for classification>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always follow the exact JSON structure shown above.\n"
        "- Do not include any text or explanation outside the JSON.\n"
    )
)

ResponseAgent = Agent(
    role="Responder",
    prompt_persona=(
        "You are a helpful customer support assistant. Your task is to craft an empathetic and helpful response "
        "to the user by considering the provided issue, sentiment, knowledge base context, and conversation history.\n\n"
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"Responder\",\n"
        "  \"response\": {\n"
        "    \"message\": \"<empathetic and helpful response crafted for the user>\",\n"
        "    \"concise_reason\": \"<brief reason explaining how this response was formulated based on inputs>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always follow the exact JSON structure.\n"
        "- The 'message' should be empathetic, helpful, and contextually relevant.\n"
        "- The 'concise_reason' should summarize how issue type, sentiment, KB context, and history influenced the response.\n"
        "- If you're unsure on you could help, escalate the situation and say thta you have escalated and a support agent with contact you shortly to better undertands\n",
        "- If order ID is not provided ask for order ID\n",
        "- If order ID not found then you can ask customer to prvide correct order id else you won't be able to process the request\n",
        "- In all of the user's request, first try to process the request by yourself, if you can't then tell that someone from team will reach out\n",
        "- You're a customer support agent. You have access to all the necessary tools to process any query, just keep asking questions till you have all the info needed to process the request\n",
        "- Do not include any extra text or explanation outside the JSON.\n"
    )
)

EscalationAgent = Agent(
    role="Escalation",
    prompt_persona=(
        "You are a triage specialist. Your task is to determine if a customer issue needs escalation.\n"
        "- If sentiment is Negative OR the issue type is Technical, classify as 'ESCALATE'.\n"
        "- Otherwise, classify as 'NO_ESCALATION'.\n\n"
        "Respond strictly in JSON format with the following schema:\n\n"
        "{\n"
        "  \"agent_name\": \"Escalation\",\n"
        "  \"response\": {\n"
        "    \"escalation_decision\": \"<one of: ESCALATE, NO_ESCALATION>\",\n"
        "    \"concise_reason\": \"<brief reason for escalation decision>\"\n"
        "  }\n"
        "}\n\n"
        "Instructions:\n"
        "- Always follow the exact JSON structure.\n"
        "- Base your decision strictly on sentiment and issue type.\n"
        "- Do not include any extra text or explanation outside the JSON.\n"
    )
)

This script defines the core AI agents of the multiagent system — each with a narrow, well-defined role and strict JSON output.

  • Data Extraction Agents — Extract key details from queries (tenant type, order ID, image path, order info, customer info, related tickets).

  • Knowledge & Policy Retrieval Agents — Retrieve relevant FAQs, company policies, and handbook entries.

  • Product Quality & Returns Agents — Compare product images for defects and validate returns against original orders.

  • Routing, Sentiment & Escalation Agents — Classify issue types, detect sentiment, and determine escalation paths.

  • Response Generation Agent — Consolidate all gathered context to produce a helpful, context-aware final reply.

Step 4: Implementing Data Retriever

In src/qdrant_util/qdrant_retriever.py, we define retriever functions that retrives the data needed for the task

import json
from qdrant_client import QdrantClient, models
from fastembed import TextEmbedding, ImageEmbedding
from fastembed import SparseTextEmbedding
from qdrant_client.models import (
    Filter, FieldCondition, MatchValue, MatchAny,
    Prefetch, SparseVector, FusionQuery, Fusion,
)


dense_embedding_model = TextEmbedding(model_name="BAAI/bge-small-en-v1.5")
sparse_embedding_model = SparseTextEmbedding(model_name="prithivida/Splade_PP_en_v1")
image_embedding_model = ImageEmbedding(model_name="Qdrant/clip-ViT-B-32-vision")

def retrieve_context(
    client: QdrantClient,
    collection_name: str,
    query_text: str,
    tenant_id: str,
    image_path: str = None,
    source_type: str = None,
    tags: list[str] = None,
    customer_id: str = None,
    k_prefetch: int = 10,
    top_k: int = 5,
    fusion_method: Fusion = Fusion.RRF,
):
    """
    Retrieve the top-K most semantically similar points matching the given filters.
    """

    dense_vector = None
    sparse_vec = None
    prefetches = []

    if query_text:
        dense_vector = list(dense_embedding_model.embed([query_text]))[0]
        sparse_result = list(sparse_embedding_model.embed([query_text]))[0]
        sparse_vec = SparseVector(
            indices=sparse_result.indices,
            values=sparse_result.values,
        )

        prefetches.append(
            Prefetch(query=sparse_vec, using="sparse", limit=k_prefetch)
        )
        prefetches.append(
            Prefetch(
                query=dense_vector.tolist() if hasattr(dense_vector, "tolist") else dense_vector,
                using="dense",
                limit=k_prefetch,
            )
        )

    if image_path and collection_name == "orders":
        from fastembed import ImageEmbedding
        image_vec = list(image_embedding_model.embed([image_path]))[0]
        prefetches.append(
            Prefetch(query=image_vec, using="image", limit=k_prefetch)
        )

    must_clauses = []
    if tenant_id:
        must_clauses.append(FieldCondition(key="tenant_id", match=MatchValue(value=tenant_id)))
    if source_type:
        must_clauses.append(FieldCondition(key="source_type", match=MatchValue(value=source_type)))
    if customer_id:
        must_clauses.append(FieldCondition(key="customer_id", match=MatchValue(value=customer_id)))
    if tags:
        must_clauses.append(FieldCondition(key="tags", match=MatchAny(any=tags)))

    payload_filter = Filter(must=must_clauses)

    fusion_query = FusionQuery(fusion=fusion_method)

    results = client.query_points(
        collection_name=collection_name,
        prefetch=prefetches,
        query=fusion_query,
        query_filter=payload_filter,
        limit=top_k,
        with_payload=True
    )

    return [
        {
            "id": hit.id,
            'similarity_with_query': hit.score,
            "payload": hit.payload
        }
        for hit in results.points
    ]

def retrieve_customer_info(
    client: QdrantClient,
    tenant_id: str,
    customer_id: str,
):
    must_clauses = [
        FieldCondition(key="tenant_id", match=MatchValue(value=tenant_id))
    ]
    must_clauses.append(
        FieldCondition(key="source_type", match=MatchValue(value="crm"))
    )
    must_clauses.append(
        FieldCondition(key="customer_id", match=MatchValue(value=customer_id))
    )

    payload_filter = Filter(must=must_clauses)

    results, _ = client.scroll(
        collection_name="user_data",
        scroll_filter=payload_filter,
        limit=1,
    )

    if not results:
        return f"No customer information found for this tenant_id: {tenant_id} and customer_id: {customer_id}."

    return results[0].payload

def retrieve_customer_helpdesk_logs(client: QdrantClient, query: str, customer_id: str, tenant_id: str, top_k: int = 3, k_prefetch: int = 10) -> str:
    """
    Retrieves a comprehensive context for a user by fetching data from
    user_data (CRM, helpdesk) and knowledge_base collections.
    """

    helpdesk_records = retrieve_context(
        client=client,
        collection_name="user_data",
        query_text=query,
        tenant_id=tenant_id,
        source_type="helpdesk",
        customer_id=customer_id,
        top_k=top_k,
        k_prefetch = k_prefetch,
        fusion_method = Fusion.RRF,
    )

    if not helpdesk_records:
        return f"No relevant customer helpdesk ticket found for this tenant_id: {tenant_id} and customer_id: {customer_id} for this particular query."

    if helpdesk_records:
        sanitized_records = [{k: v for k, v in record.items() if not k == 'id'} for record in helpdesk_records]
        context = json.dumps(sanitized_records, indent=2)

    return context

def retrieve_related_knowledge_base(client: QdrantClient, query: str, tenant_id: str, source_type: str, tags: list=None, top_k: int = 3, k_prefetch: int = 10) -> str:
    related_kb = retrieve_context(
        client=client,
        collection_name="knowledge_base",
        query_text=query,
        tenant_id=tenant_id,
        source_type=source_type,
        tags=tags,
        top_k=top_k,
        k_prefetch = k_prefetch,
        fusion_method = Fusion.RRF,
    )

    if not related_kb:
        return f"No relevant knowledge base found for tenant_id: {tenant_id}, source_type: {source_type} with tags: {tags} for this particular query"

    sanitized_records = [{k: v for k, v in doc.items() if not k == 'id'} for doc in related_kb]
    context = json.dumps(sanitized_records, indent=2)
    return context

def retrieve_order_info(
    client: QdrantClient,
    tenant_id: str,
    customer_id: str,
    order_id: str,
):
    must_clauses = [
        FieldCondition(key="tenant_id", match=MatchValue(value=tenant_id))
    ]
    must_clauses.append(
        FieldCondition(key="order_id", match=MatchValue(value=order_id))
    )
    must_clauses.append(
        FieldCondition(key="customer_id", match=MatchValue(value=customer_id))
    )

    payload_filter = Filter(must=must_clauses)

    results, _ = client.scroll(
        collection_name="orders",
        scroll_filter=payload_filter,
        limit=1,
    )

    if not results:
        return f"No order information found for this tenant_id: {tenant_id}, customer_id: {customer_id}, and order_id: {order_id}"

    return results[0].payload


def retrieve_image_info(
    client: QdrantClient,
    query_text: str,
    image_path: str,
    tenant_id: str,
    customer_id: str,
    top_k: int = 3,
    k_prefetch: int = 10,
):
    """
    Retrieve similar orders by multimodal query (text + image).
    """
    return retrieve_context(
        client=client,
        collection_name="orders",
        query_text=query_text,
        image_path=image_path,
        tenant_id=tenant_id,
        customer_id=customer_id,
        top_k=top_k,
        k_prefetch=k_prefetch,
        fusion_method=Fusion.RRF
    )

This script defines the retrieval layer of the multiagent system — the functions that fetch relevant context from Qdrant to support agent reasoning. It initializes dense, sparse, and image embedding models, and provides utilities for:

  • General Context Retrieval → retrieve_context() performs hybrid dense+sparse (and optional image) search with prefetch, fusion (RRF), and metadata filters like tenant_id, customer_id, source_type, and tags.

  • Customer Data Retrieval → retrieve_customer_info() pulls CRM records from the user_data collection, while retrieve_customer_helpdesk_logs() fetches and filters relevant helpdesk tickets for a given query.

  • Knowledge Base Retrieval → retrieve_related_knowledge_base() fetches FAQs, policies, or handbook entries with hybrid search, returning top-matched documents.

  • Order Data Retrieval → retrieve_order_info() retrieves structured order details for a given tenant, customer, and order ID, while retrieve_image_info() supports multimodal retrieval by combining text and product images.

Together, these functions form the backbone of retrieval-augmented reasoning, ensuring every agent task has precise, tenant-specific, and context-rich inputs from Qdrant.

Step 5: Implementing Tasks

In src/agents_util/tasks.py, we define tasks that agents execute, integrating Qdrant retrieval.

from qdrant_client import QdrantClient
from lyzr_automata import Task
from lyzr_automata.tasks.task_literals import InputType, OutputType

from llm import load_gemini_model
from agents_util.agents import (
    TenantResolverAgent, 
    CustomerInfoExtractorAgent,
    TicketExtractorAgent,
    FAQExtractorAgent,
    HandbookExtractorAgent,
    PolicyExtractorAgent,
    RouterAgent,
    SentimentAgent, 
    EscalationAgent, 
    ResponseAgent,
    ReturnValidationAgent,
    ProductQualityCheckAgent,
    OrderIDExtractorAgent,
    ImagePathExtractorAgent,
    OrderInfoExtractorAgent
)
from qdrant_util.qdrant_retriever import (
    retrieve_customer_info, 
    retrieve_customer_helpdesk_logs, 
    retrieve_related_knowledge_base,
    retrieve_order_info,
    retrieve_image_info
)


import os
from dotenv import load_dotenv
load_dotenv()

GEMINI_MODEL_NAME = os.getenv("GEMINI_MODEL_NAME")

gemini_model = load_gemini_model(model_name=GEMINI_MODEL_NAME)
qdrant = QdrantClient(host="localhost", port=6333)

# resolved_faq_tags = ['payments']
# resolved_policy_tags = ['payments']
# resolved_handbook_tags = ['payments']

def get_image_path_extraction_task(user_input):
    task = Task(
        name="ImagePathExtraction",
        agent=ImagePathExtractorAgent,
        model=gemini_model,
        instructions=user_input,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT
    )
    return task

def get_order_id_extraction_task(user_input):
    task = Task(
        name="OrderIDExtraction",
        agent=OrderIDExtractorAgent,
        model=gemini_model,
        instructions=user_input,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT
    )
    return task

def get_return_product_validation_task(tenant_id, customer_id, order_id, image_path):

    order_info = retrieve_order_info(
        client = qdrant,
        tenant_id = tenant_id,
        customer_id = customer_id,
        order_id = order_id
    )
    retrieved_image_info = retrieve_image_info(
        client = qdrant,
        image_path=image_path,
        tenant_id = tenant_id,
        customer_id = customer_id,
        top_k=1,
        k_prefetch=10,
        query_text=None
    )
    context = f"""
Original Product Info
----------------------
{order_info}

Retrieved Image Info
--------------------
{retrieved_image_info}
"""
    task = Task(
        name="ReturnValidation",
        agent=ReturnValidationAgent,
        model=gemini_model,
        instructions=context,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT
    )
    return task


def get_order_info_task(tenant_id, customer_id, order_id):
    order_info = retrieve_order_info(
        client = qdrant,
        tenant_id = tenant_id,
        customer_id = customer_id,
        order_id = order_id
    )
    task = Task(
        name="OrderInfoExtraction",
        agent=OrderInfoExtractorAgent,
        model=gemini_model,
        instructions=order_info,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT
    )
    return task

def get_product_quality_check_task(tenant_id, customer_id, order_id, image_path):
    order_info = retrieve_order_info(
        client = qdrant,
        tenant_id = tenant_id,
        customer_id = customer_id,
        order_id = order_id
    )
    retrieved_image_info = retrieve_image_info(
        client = qdrant,
        image_path=image_path,
        tenant_id = tenant_id,
        customer_id = customer_id,
        top_k=1,
        k_prefetch=10,
        query_text=None
    )
    context = f"""
Original Product Info
----------------------
{order_info}

Retrieved Image Info
--------------------
{retrieved_image_info}
"""
    task = Task(
        name="ProductQualityChecker",
        agent=ProductQualityCheckAgent,
        model=gemini_model,
        instructions=context,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT
    )
    return task

def get_tenant_identification_task(user_input):
    task = Task(
        name="TenantIdentification",
        agent=TenantResolverAgent,
        model=gemini_model,
        instructions=user_input,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT
    )
    return task

def get_customer_info_extraction_task(user_input, tenant_id, customer_id):
    context = {
        "user_query": user_input,
        "customer_info": retrieve_customer_info(
            client = qdrant,
            tenant_id = tenant_id,
            customer_id = customer_id,
        )
    }
    task = Task(
        name="CustomerInfoExtraction",
        agent=CustomerInfoExtractorAgent,
        model=gemini_model,
        instructions=context,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT
    )
    return task

def get_ticket_extraction_task(user_input, customer_id, tenant_id, top_k=3, k_prefetch=10):
    context = {
        "user_query": user_input,
        "helpdesk_logs": retrieve_customer_helpdesk_logs(
            client = qdrant,
            query =  user_input, 
            customer_id = customer_id,
            tenant_id = tenant_id,
            top_k = top_k,
            k_prefetch = k_prefetch
        )
    }
    task = Task(
        name="TicketExtraction",
        agent=TicketExtractorAgent,
        model=gemini_model,
        instructions=context,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT
    )
    return task

def get_faq_extraction_task(user_input, tenant_id, top_k=3, k_prefetch=10, fail_feedback=None):
    context = {
        "user_query": user_input,
        "faqs": retrieve_related_knowledge_base(
            client = qdrant,
            query =  user_input, 
            source_type = "faqs",
            tenant_id = tenant_id,
            tags = None,
            top_k = top_k,
            k_prefetch = k_prefetch
        )
    }
    if fail_feedback:
        context['error_in_prev_generated_response'] = fail_feedback
    task = Task(
        name="FAQExtraction",
        agent=FAQExtractorAgent,
        model=gemini_model,
        instructions=context,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT
    )
    return task

def get_handbook_extraction_task(user_input, tenant_id, top_k=3, k_prefetch=10, fail_feedback=None):
    context = {
        "user_query": user_input,
        "handbook": retrieve_related_knowledge_base(
            client = qdrant,
            query =  user_input, 
            source_type = "handbook",
            tenant_id = tenant_id,
            tags = None,
            top_k = top_k,
            k_prefetch = k_prefetch
        )
    }
    if fail_feedback:
        context['error_in_prev_generated_response'] = fail_feedback
    task = Task(
        name="HandbookExtraction",
        agent=HandbookExtractorAgent,
        model=gemini_model,
        instructions=context,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT
    )
    return task

def get_policy_extraction_task(user_input, tenant_id, top_k=3, k_prefetch=10, fail_feedback=None):
    context = {
        "user_query": user_input,
        "policy": retrieve_related_knowledge_base(
            client = qdrant,
            query =  user_input, 
            source_type = "policy",
            tenant_id = tenant_id,
            tags = None,
            top_k = top_k,
            k_prefetch = k_prefetch
        )
    }
    if fail_feedback:
        context['error_in_prev_generated_response'] = fail_feedback
    task = Task(
        name="PolicyExtraction",
        agent=PolicyExtractorAgent,
        model=gemini_model,
        instructions=context,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT
    )
    return task



def get_routing_task(user_input):
    route_task = Task(
        name="RouteIssue",
        agent=RouterAgent,
        model=gemini_model,
        instructions=user_input,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT
    )
    return route_task


def get_sentiment_analysis_task(user_input):
    senti_task = Task(
        name="AnalyzeSentiment",
        agent=SentimentAgent,
        model=gemini_model,
        instructions=user_input,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT
    )
    return senti_task


def get_escalation_task(user_input, route_task, senti_task):
    escalation_task = Task(
        name="CheckEscalation",
        agent=EscalationAgent,
        model=gemini_model,
        instructions=user_input,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT,
        input_tasks = [
            route_task, 
            senti_task
        ]
    )
    return escalation_task

def get_response_task(full_context, history, route_task, senti_task, escalation_task):
    resp_instructions = (
        f"You have been provided with the Issue, "
        f"sentiment of the user, "
        f"Knowledge Base context:\n{full_context}\n,"
        f"History: {history}\n"
        f"and Escalation status\n\n"
        "Now craft the final support response based on all of the above."
    )
    response_task = Task(
        name="GenerateResponse",
        agent=ResponseAgent, 
        model=gemini_model,
        instructions=resp_instructions,
        input_type=InputType.TEXT,
        output_type=OutputType.TEXT,
        input_tasks = [
            route_task, 
            senti_task, 
            escalation_task
        ]
    )
    return response_task

This script wraps the earlier raw agents into executable Task objects for the orchestration pipeline. It loads the Gemini LLM, connects to Qdrant, and defines the following:

  • Pure Extraction Tasks — Extract structured details (e.g., image path, order ID, tenant type).

  • Retrieval-Augmented Tasks — Pull context from Qdrant before extraction (customer info, tickets, FAQs, policies, handbooks, order info).

  • Multimodal Validation Tasks — Combine text and image data for product quality checks and return validation.

  • Flow Control Tasks — Handle routing, sentiment analysis, and escalation decisions.

  • Response Generation Task — Use all gathered context, history, and classification outputs to generate the final answer.

Step 6: Running the Chat Session

In run_chat.py, we orchestrate the workflow using Lyzr’s LinearSyncPipeline.

import uuid
from lyzr_automata.pipelines.linear_sync_pipeline import LinearSyncPipeline

from llm import load_gemini_model
from utils import (
     get_history, 
     save_message,
     text_2_json,
     task_with_feedback_loop
)
from agents_util.tasks import (
     get_tenant_identification_task, 
     get_customer_info_extraction_task,
     get_ticket_extraction_task,
     get_faq_extraction_task,
     get_handbook_extraction_task,
     get_policy_extraction_task,
     get_routing_task,
     get_sentiment_analysis_task,
     get_escalation_task,
     get_response_task,
     get_image_path_extraction_task,
     get_order_id_extraction_task,
     get_return_product_validation_task,
     get_product_quality_check_task,
     get_order_info_task
)
from qdrant_util.caching import SemanticCache

import os
from dotenv import load_dotenv
load_dotenv()

GEMINI_MODEL_NAME = os.getenv("GEMINI_MODEL_NAME")

gemini_model = load_gemini_model(model_name=GEMINI_MODEL_NAME)
cache = SemanticCache(threshold=0.2)

debug = False

def run_session():
    session_id = str(uuid.uuid4())
    print(f"Session {session_id} started.")

    while True:
        user_input = input("User: ")
        # ask the user to provide with the user id for now
        # but when this becomes a product, it can be directly taken from the request body
        resolved_customer_id = 'CUST-010'
        if user_input.lower() == "exit":
            break

        save_message(session_id, "user", user_input)

        history = get_history(session_id)
        tenant_task_response = get_tenant_identification_task(user_input).execute()
        tenant_task_response = text_2_json(tenant_task_response)
        resolved_tenant_id = tenant_task_response['response']['tenant_type']
        # save_message(session_id, "TenantResolverAgent", tenant_task_response)

        cached_response = cache.check_cache(user_input, resolved_tenant_id, resolved_customer_id)
        if cached_response:
            final_message = cached_response
        else:
            customer_info_task_response = get_customer_info_extraction_task(user_input, resolved_tenant_id, resolved_customer_id).execute()
            customer_info_task_response = text_2_json(customer_info_task_response)
            final_customer_info = customer_info_task_response['response']['customer_info']
            # save_message(session_id, "CustomerInfoRetrieverAgent", customer_info_task_response)

            ticket_task_response = get_ticket_extraction_task(user_input, resolved_customer_id, resolved_tenant_id, top_k=3, k_prefetch=10).execute()
            ticket_task_response = text_2_json(ticket_task_response)
            relavant_ticket_info = ticket_task_response['response']['related_tickets']
            # save_message(session_id, "TicketInfoRetrieverAgent", ticket_task_response)

            # doesn't need to return anything if it doesn't find any related faq
            # can have multiple faqs, if need be
            # do not blindly rely on similarity with query value, use your own brain
            relavant_faqs, faq_task_response = task_with_feedback_loop(
                user_input, get_faq_extraction_task, resolved_tenant_id, 'related_faqs', 
                session_id, 'FAQsRetrieverAgent',"Couldn't find any FAQs related to the user query", top_k=3, k_prefetch=10
            )

            relavant_policy, policy_task_response = task_with_feedback_loop(
                user_input, get_policy_extraction_task, resolved_tenant_id, 'related_policies', 
                session_id, 'PolicyRetrieverAgent',"Couldn't find any policy related to the user query", top_k=3, k_prefetch=10
            )

            relavant_handbook, handbook_task_response = task_with_feedback_loop(
                user_input, get_handbook_extraction_task, resolved_tenant_id, 'related_handbooks', 
                session_id, 'HandbookRetrieverAgent',"Couldn't find any handbook related to the user query", top_k=3, k_prefetch=10
            )

            image_path_task_response = get_image_path_extraction_task(user_input).execute()
            image_path_task_response = text_2_json(image_path_task_response)
            image_path = image_path_task_response['response']['image_path']

            order_id_extraction_task = get_order_id_extraction_task(user_input).execute()
            order_id_extraction_task = text_2_json(order_id_extraction_task)
            order_id = order_id_extraction_task['response']['order_id']

            # Only process order info if order_id is not None and not empty
            if order_id and order_id.strip():
                try:
                    order_info_task = get_order_info_task(resolved_tenant_id, resolved_customer_id, order_id).execute()
                    order_info_task = text_2_json(order_info_task)
                    order_info = order_info_task['response']['order_info']
                except Exception as e:
                    order_info = f"Could not retrieve order information for order ID: {order_id}. Error: {str(e)}"
            else:
                order_info = "No order ID found in the user query."


            if image_path and order_id and order_id.strip():
                try:
                    return_product_validation_task_response = get_return_product_validation_task(resolved_tenant_id, resolved_customer_id, order_id, image_path).execute()
                    return_product_validation_task_response = text_2_json(return_product_validation_task_response)
                    return_validation_check = return_product_validation_task_response['response']

                    product_quality_check_task_response = get_product_quality_check_task(resolved_tenant_id, resolved_customer_id, order_id, image_path).execute()
                    product_quality_check_task_response = text_2_json(product_quality_check_task_response)
                    product_quality_check = product_quality_check_task_response['response']
                except Exception as e:
                    return_validation_check = f"Could not validate return for order {order_id}. Error: {str(e)}"
                    product_quality_check = f"Could not check product quality for order {order_id}. Error: {str(e)}"
            else:
                if not image_path:
                    return_validation_check = "Can't say as the user has not provided the image of the product"
                    product_quality_check = "Can't say as the user has not provided the image of the product"
                else:
                    return_validation_check = "Can't process as no valid order ID was found"
                    product_quality_check = "Can't process as no valid order ID was found"

            full_context = f"""
Customer Info
-------------
{final_customer_info}

Related User's Issued Tickets
------------------------------
{relavant_ticket_info}

Relavant FAQs for the User Query
---------------------------------
{relavant_faqs}

Relavant Policies for the User Query
---------------------------------
{relavant_policy}

Relavant Handbooks for the User Query
---------------------------------
{relavant_handbook}

Order Info
----------
{order_info}

Product Quality Check Info For Returing The Defect Free Product
----------------------------------------------------------------
{product_quality_check}

Product Return Validation Check
------------------------------- 
{return_validation_check}

"""
# final_customer_info, relavant_ticket_info, relavant_policy, relavant_handbook, image_path, order_id, order_info, return_validation_check, product_quality_check, full_context
            if debug:
                print('Resolved Customer ID:\n\t', resolved_customer_id)
                print('Resolved Tenant ID:\n\t', resolved_tenant_id)
                print('Extracted Customer Info:\n\t', final_customer_info)
                print('Extracted Tickets:\n\t', relavant_ticket_info)
                print('Extracted FAQs:\n\t', relavant_faqs)
                print('Extracted Policies:\n\t', relavant_policy)
                print('Extracted Handbooks:\n\t', relavant_handbook)
                print('Extracted Image Path:\n\t', image_path)
                print('Extracted Order ID:\n\t', order_id)
                print('Extracted Order Info:\n\t', order_info)
                print('Extracted Return Validation Check:\n\t', return_validation_check)
                print('Extracted Product Quality Check:\n\t', product_quality_check)
                print('Full Context:\n\t', full_context)

            routing_task = get_routing_task(user_input)
            senti_task = get_sentiment_analysis_task(user_input)
            escalation_task = get_escalation_task(user_input, routing_task, senti_task)
            responding_task = get_response_task(full_context, history, routing_task, senti_task, escalation_task)


            pipeline = LinearSyncPipeline(
                name="MultiAgentSupport",
                completion_message="Done",
                tasks=[
                    routing_task,
                    senti_task,
                    escalation_task,
                    responding_task
                ]
            )
            outputs = pipeline.run()

            response = outputs[-1]['task_output']
            response = text_2_json(response)
            final_message = response['response']['message']
            cache.add_to_cache(user_input, final_message, resolved_tenant_id, resolved_customer_id)

        print(f"Agent: {final_message}")
        save_message(session_id, "assistant", final_message)

if __name__ == "__main__":
    run_session()

This script is the live chat orchestrator for the multi-agent system, integrating all agents, retrieval functions, and caching into real-time query handling. It loads the Gemini 2.0 Flash model, sets up a semantic cache (threshold = 0.2), and manages session history.

For each query, the system:

  1. Resolves the tenant.

  2. Checks the cache.

  3. If no cache hit, retrieves and processes customer info, tickets, knowledge base entries, order details, and product validation/quality checks.

The gathered context then flows through a final agent chain — Routing, Sentiment, Escalation, and Response — in a sequential pipeline, generating a context-aware reply that is also stored for future reuse.

Agents make many LLM requests to perform a task, so it’s a good idea to cache queries. This way, when someone asks a similar question, the system can return the cached result instead of making a new LLM call — reducing operational costs. In the example above, without caching, the exact same query triggered two separate LLM calls, unnecessarily increasing the cost.

Semantic Caching

import uuid
import time
from qdrant_client import QdrantClient, models
from fastembed import TextEmbedding
from qdrant_client.models import PointStruct, NamedVector

DENSE_EMBEDDING_MODEL_NAME = "BAAI/bge-small-en-v1.5"
DENSE_EMBEDDING_MODEL = TextEmbedding(model_name=DENSE_EMBEDDING_MODEL_NAME)
client = QdrantClient(host="localhost", port=6333)

class SemanticCache:
    def __init__(self, threshold: float = 0.2):
        self.client = client
        self.embedding_model = DENSE_EMBEDDING_MODEL
        self.collection_name = "semantic_cache"
        self.threshold = threshold 

    def check_cache(self, query_text: str, tenant_id: str, customer_id: str):
        """Checks the cache for a semantically similar query."""
        start_time = time.time()
        query_vector = list(self.embedding_model.embed([query_text]))[0]

        search_result = self.client.search(
            collection_name=self.collection_name,
            query_vector=NamedVector(
                name="dense",
                vector=query_vector
            ),
            query_filter=models.Filter(
                must=[
                    models.FieldCondition(key="tenant_id", match=models.MatchValue(value=tenant_id)),
                    models.FieldCondition(key="customer_id", match=models.MatchValue(value=customer_id))
                ]
            ),
            limit=1,
            with_payload=True
        )
        if search_result and search_result[0].score <= self.threshold:
            end_time = time.time()
            print(f"CACHE HIT! (Score: {search_result[0].score:.4f}, Time: {end_time - start_time:.4f}s)")
            return search_result[0].payload.get("response")

        print("CACHE MISS!")
        return None

    def add_to_cache(self, query_text: str, response_text: str, tenant_id: str, customer_id: str):
        """Adds a new query-response pair to the cache."""
        query_vector = list(self.embedding_model.embed([query_text]))[0]

        points=PointStruct(
            id=str(uuid.uuid4()),
            vector={
                "dense": query_vector,
            },
            payload={
                'response': response_text,
                'tenant_id': tenant_id,
                'customer_id': customer_id,
            },
        )

        self.client.upsert(
            collection_name=self.collection_name,
            points=[points],
            wait=True
        )
        print("Added new entry to semantic cache.")

Purpose of the Semantic Cache

The semantic cache stores query–response pairs as dense embeddings in Qdrant, enabling instant replies to semantically similar queries. Using the BAAI/bge-small-en-v1.5 model (threshold = 0.2), each new query is embedded, searched within the same tenant_id and customer_id, and, if the similarity is high enough, a cached answer is returned. On a cache miss, the system embeds and stores the new query–response pair. This approach delivers low-latency, tenant-specific, and scalable performance by avoiding full retrieval and reasoning for repeated questions.

Results

Information Extraction

Here, the user enquires about the refund status. The agent successfully gathers all related information and crafts a response based on it. The agent’s capabilities could be further enhanced by integrating real-time calling APIs to make a call, understand the situation, and resolve it accordingly. The more external tools you integrate, the more autonomous your agents become in resolving tasks.

Semantic Caching

In our semantic cache vector database, this query is now stored. If a similar query is received later, the agent can use the cached information to provide a response instead of running the entire pipeline.

This image shows the caching of the user query and the agent’s response in the Qdrant database. You can access the Qdrant dashboard using localhost: http://localhost:6333/dashboard

The next time a user asks a similar question, the agent will reuse the previously generated response, reducing operational costs.

Multi-Modal Capability

In this example, we tested whether the agent could perform a product quality check during a return. As shown, it rejected the return request because the original product did not match the provided image.

Conclusion

To conclude, this Lyzr–Qdrant multi-agent system combines specialized AI agents, retrieval-augmented reasoning, and semantic caching to deliver fast, accurate, and empathetic customer support. It draws context from diverse data sources, learns from past interactions, and scales to handle complex, multi-step queries — making it a powerful foundation for next-generation automated support.

GitHub

You can find the full source code for this multiagent customer support system at the following GitHub repository:

https://github.com/quamernasim/multi-agentic-customer-support-lyzr-qdrant/

References

  1. https://qdrant.tech/documentation/quickstart/

  2. https://qdrant.tech/documentation/concepts/indexing/

  3. https://qdrant.tech/documentation/examples/llama-index-multitenancy/

  4. https://qdrant.tech/documentation/multimodal-search/

  5. https://qdrant.tech/articles/semantic-cache-ai-data-retrieval/

  6. https://qdrant.tech/articles/sparse-vectors/

  7. https://docs.lyzr.ai/lyzr-automata/getting-started

  8. https://qdrant.tech/documentation/advanced-tutorials/reranking-hybrid-search/

  9. https://www.lyzr.ai

0
Subscribe to my newsletter

Read articles from M Quamer Nasim directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

M Quamer Nasim
M Quamer Nasim

👋 Hello! I'm M Quamer Nasim, a passionate Data Scientist with over 3.5 years of experience in the field. 🔍 Specializing in Computer Vision and LLMs, I thrive on transforming complex data into actionable insights. My expertise extends to building RAG-Powered applications that drive innovation and efficiency. 🤝 Collaborative by nature, I enjoy working in cross-functional teams to deliver impactful solutions that meet business objectives and exceed expectations. 📈 Continuous learning is a cornerstone of my professional journey, and I am always eager to explore new technologies and methodologies to stay ahead in this ever-evolving field. 🔗 Let's connect! I'm always open to networking with like-minded professionals and exploring opportunities for collaboration.