Natural Language Queries on Postgres using CQRS (Elasticsearch) & Spring AI

Arun BalchandranArun Balchandran
10 min read

Overview

Interacting with data through natural language is quickly becoming a mainstream expectation in modern applications. Whether it's asking for "all orders over $50" or "products added last week", enabling users to query systems in natural language can dramatically improve accessibility and productivity of users.


Motivation

Why Elasticsearch?

Relational databases like PostgreSQL are great for storing structured, transactional data. However, when it comes to searching large datasets with flexible, full-text queries, they can be limiting and complicated. To solve this pattern, we can use the CQRS pattern.

CQRS

CQRS (Command Query Responsibility Segregation) is a design pattern that separates read operations (queries) from write operations (commands). Instead of using the same model to update and read data, you use two distinct models:

  • Commands modify data (like "PlaceOrder" or "UpdateOrder")

  • Queries fetch data (like "GetOrderDetails" or "ListOrders")

This separation makes systems more scalable, maintainable, and can help optimize each side individually. You could implement this various ways, e.g.: single database with a Command module & Query module interfacing with it in the application side, or in this example, using a relational database (Postgresql) for writes and a fast NoSQL (Elasticsearch) for reads. For more information, you can read Martin Fowler’s Essay on this topic.

How does Elasticsearch fit the bill?

Elasticsearch complements PostgreSQL by enabling powerful, fast, and fuzzy search capabilities over the same data. By syncing PostgreSQL to Elasticsearch (using kafka connectors), you get the best of both worlds: reliable data storage and advanced search. This proves useful for applications like Semantic search, RAG & many other search related use cases.

Complexity of querying

Traditionally, search in applications is implemented using static queries—hardcoded SQL / Query DSL with strict filters or limited parameters (complexity that grows as the users ask to query more fields). But users rarely think in ‘columns’, they want to answers to questions like:

  • “Show all orders over $50 from last month”

  • “Find customers who bought greater than 7 items”

Supporting these kind of dynamic, user-generated queries is tough with static scripts or code alone. It either leads to overly complex query builders or brittle pattern matching logic with limited flexibility. Using Spring AI with Elasticsearch powered by an LLM (OpenAI), we can now interpret & translate the queries it into dynamically generated DSL making it easier to query the system, in turn making it much more user-friendly and accessible.

In this post, we will:

  • Build an infrastructure pipeline with PostgreSQL, Kafka, and Elasticsearch enabling near real-time searchability

  • Use Spring AI to handle natural language query interpretation

  • Run and test the system with real examples


Setting Up and Running the Project

Prerequisites

Before getting started, make sure the following tools are installed:

  • Java 21 using SDKMAN

  • Docker & Docker Compose

  • OpenAI API Key

  • Your favorite text editor (I’m using Intellij for Java & VSCode for React)

  • Optional: If running the UI helper application, you need Node (v22.12.0+ preferred)

Using WSL?
If you're using WSL like I am, the same steps would work for you as well. Just open your favorite terminal editor (I prefer Powershell), & login to WSL.

Now you can follow along with the commands described in the post.

Note: If you have any trouble following any of the steps, you can check the application README.md in Github for troubleshooting steps

1. Clone the Repository & Start Infrastructure

You can find the code here: https://github.com/arunbalachandran/QueryElasticUsingSpringAI

git clone https://github.com/arunbalachandran/QueryElasticUsingSpringAI
cd QueryElasticUsingSpringAI
docker compose up -d

[+] Building 0.0s (0/0)
[+] Running 9/9
 ✔ Network queryelasticusingspringai_dockernet             Create...                                               0.1s
 ✔ Container queryelasticusingspringai-postgres-1          Sta...                                                  0.9s
 ✔ Container queryelasticusingspringai-zookeeper-1         St...                                                   0.8s
 ✔ Container queryelasticusingspringai-elasticsearch-1     Started                                                 0.7s
 ✔ Container queryelasticusingspringai-kibana-1            Start...                                                1.0s
 ✔ Container queryelasticusingspringai-kafka-1             Starte...                                               1.2s
 ✔ Container queryelasticusingspringai-akhq-1              Started                                                 1.6s
 ✔ Container queryelasticusingspringai-debezium-connect-1  Started                                                 1.7s
 ✔ Container queryelasticusingspringai-kafka-connect-1     Started                                                 1.7s

2. Open querybackend

Open the querybackend project in your favorite text editor. I’ve used Intellij:

3. Set Your Environment for Java

export OPENAI_API_KEY=your-api-key-here

Or, if you're using IntelliJ, you can set it in your run configuration.

4. Initialize Elasticsearch Mapping

Create the order index:

Note: If using Postman, import the collection included in the repository into Postman & you can follow along using the endpoints mentioned here.

curl -X PUT "http://localhost:9200/order" \
-H "Content-Type: application/json" \
-d '{
    "mappings": {
    "properties": {
        "id": {
        "type": "keyword"
        },
        "product_name": {
        "type": "text",
        "fields": {
            "keyword": {
            "type": "keyword",
            "ignore_above": 256
            }
        }
        },
        "product_qty": {
        "type": "integer"
        },
        "product_price": {
        "type": "double"
        },
        "product_description": {
        "type": "text"
        },
        "created_time": {
        "type": "date"
        },
        "updated_time": {
        "type": "date"
        }
    }
    }
}'

5. Start the Application

./gradlew bootRun

Or, if using Intellij, run the ‘bootRun’ gradle task


Data Flow: PostgreSQL ➝ Kafka ➝ Elasticsearch

To get data flowing from your relational DB to Elasticsearch:

1. Set Up Kafka Connectors

# PostgreSQL to Kafka (Debezium)
curl -X POST http://localhost:8084/connectors -H "Content-Type: application/json" -d '{
    "name": "postgres-to-kafka-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname" : "querybackend",
        "topic.prefix": "connector",
        "tasks.max": "1",
        "schemas.enable": "false",
        "schema.include.list": "public",
        "table.include.list": "public.orders",
        "signal.data.collection": "public.debezium_signal",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "auto.register.schemas": true,
        "topic.creation.default.replication.factor": 1,
        "topic.creation.default.partitions": 1,
        "transforms": "extractlatest",
        "transforms.extractlatest.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
        "transforms.extractlatest.field": "after",
        "time.precision.mode": "connect",
        "decimal.handling.mode": "double",
        "heartbeat.interval.ms": "1800000",
        "snapshot.mode": "initial",
        "plugin.name": "pgoutput",
        "slot.name" : "query_slot_orders_01"
    }
}'

# Kafka to Elasticsearch (Confluent Sink)
curl -X POST http://localhost:8084/connectors -H "Content-Type: application/json" -d '{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "connector.public.orders",
    "schemas.enable": false,
    "schema.ignore": true,
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,    
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "type.name": "_doc",
    "key.ignore": false,
    "index": "orders",
    "connection.url": "http://elasticsearch:9200",
    "transforms": "InsertKey,ExtractId",
    "transforms.InsertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.InsertKey.fields": "id",
    "transforms.ExtractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.ExtractId.field": "id",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.drop.deletes": "false",
    "behavior.on.null.values": "delete"
  }
}'

Querying with Natural Language

Once everything is up and running, it's time to test the actual use case: converting plain English queries into Elasticsearch queries using Spring AI.

Testing the Backend API

Add Test Data

curl -X POST http://localhost:8080/api/v1/orders -H "Content-Type: application/json" -d '{
  "productName": "Peaches",
  "productQty": 8,
  "productPrice": 12,
  "productDescription": "Peaches"
}'

Try a Natural Language Query

The moment of truth! We will harness the power of OpenAI’s API to parse our queries & fetch the data from our database.

Note: I’ve already added a few sample records in the database.

curl -X POST http://localhost:8080/api/v1/elastic/query \
-H "Content-Type: application/json" \
-d '{
    "query": "Get me all the orders with quantity greater than 5"
}'

# Response
[
    {
        "id": "bfacba9b-66bb-480f-b63c-eeb1f0381a21",
        "productName": "Orange",
        "productQty": 6,
        "productPrice": 10.0,
        "productDescription": "California Oranges",
        "createdTime": "2025-05-20T07:52:59.54",
        "updatedTime": "2025-05-20T07:52:59.541"
    },
    {
        "id": "4c3e65ec-8684-480a-a9b0-50925209146d",
        "productName": "Mangoes",
        "productQty": 12,
        "productPrice": 15.0,
        "productDescription": "Alphonso Mangoes",
        "createdTime": "2025-05-20T07:53:30.606",
        "updatedTime": "2025-05-20T07:53:30.606"
    },
    {
        "id": "7f1838cf-577a-42bd-b4a2-d58d3a70cedd",
        "productName": "Apple",
        "productQty": 8,
        "productPrice": 10.0,
        "productDescription": "Apples",
        "createdTime": "2025-05-20T07:52:46.101",
        "updatedTime": "2025-05-20T07:52:46.101"
    },
    {
        "id": "6fa9a0f8-b2c8-4c1c-86a5-b5344d2f2a58",
        "productName": "Peaches",
        "productQty": 7,
        "productPrice": 12.0,
        "productDescription": "Peaches",
        "createdTime": "2025-05-20T07:54:01.823",
        "updatedTime": "2025-05-20T07:54:01.823"
    }
]

Behind the scenes, Spring AI uses the OpenAI API to interpret the query and translate it into a DSL that Elasticsearch understands. The system is designed to be extensible, so you can add more context, prompt templates, or user data as needed.


Seeing it in action

I’ve included a sample React based UI app, that we can use to see the API in action in the context of a Real World Application

Run UI Code

Navigate to the queryui folder & run the following commands:

cd queryui
# install dependencies if you haven't already
npm install
# run the application
npm run dev
# Application starts on port 5173

Navigate to the application on http://localhost:5173 & you should see a UI that looks like this:

Run a query & you should see the results update in the screen:


Under the Hood

The connector

  • There are 2 connectors at play here. The Debezium connector & the Confluent connector.

  • The debezium connector is responsible for relaying the data from Postgres to Kafka

  • While, the confluent connector is responsible for relaying the data from Kafka to Elasticsearch.

  • 💡Note: It may be possible to use Debezium to stream the data both as the source & the sink connectors, but is a bit more cumbersome to setup. This is left as an exercise to the reader.

Spring AI integration

Let’s take a closer look at the PromptService class. This is where the magic happens. It’s responsible for turning user-written natural language queries into structured Elasticsearch queries with the help of Spring AI, which provides us a wrapper to the Open AI Large Language Model (LLM).

Purpose of the Service

The PromptService serves as the bridge between user intent and machine-readable queries. When a user enters a question like “Show me all orders above $50 from last month”, this service constructs a prompt and uses an LLM to generate a precise Elasticsearch query based on the structure of the index.


Key Components

ChatModel

  • Injects the LLM backend provided by Spring AI. Here configured to use OpenAI (gpt-4o).

  • This is the model responsible for converting natural language into JSON-based Elasticsearch queries.

ElasticSearchService

  • A helper service to interact with the Elasticsearch index, get mappings, and perform searches.

Prompt Initialization (@PostConstruct)

@PostConstruct
public void init() {
    String mapping = elasticsearchService.getOrderMapping();
    this.basePrompt = """
        I need you to convert natural language user queries into elasticsearch queries...
        ...
        """.formatted(mapping);
}
  • On application startup, we retrieve the Elasticsearch mapping for the Order index.

  • This mapping is embedded into the basePrompt. It gives the LLM context about what fields exist (e.g., productName, productPrice, createdTime).

  • The prompt instructs the LLM how to behave, e.g., “don't use markdown”, “output the mapping without formatting,” and “understand the semantics of what’s being asked.”

Processing the User Query

public List<OrderDTO> processPrompt(String userQuery) {
    String fullPrompt = basePrompt + "\nUser query: " + userQuery;
    ...
}
  1. The userQuery (e.g., “Get me all the orders with quantity greater than 5”) is appended to the basePrompt.

  2. This becomes a single, complete prompt that’s fed to the LLM.

public List<OrderDTO> processPrompt(String userQuery) {
    String fullPrompt = basePrompt + "\nUser query: " + userQuery;
    log.info("Prompt being used: {}", fullPrompt);
    ChatResponse chatResponse = chatModel.call(
            new Prompt(
                    fullPrompt,
                    OpenAiChatOptions.builder()
                            .model("gpt-4o")
                            .temperature(1.0)
                            .build()
            )
    );
    String elasticQuery = chatResponse.getResult().getOutput().getText();
    log.info("Elastic query: {}", elasticQuery);
    Map<String, Object> response = elasticsearchService.search(elasticQuery);
    return ElasticMapper.mapToOrderDTO(response);
}
  1. The prompt is sent to the LLM (chatModel.call()), and we expect a raw JSON Elasticsearch query in return.
String elasticQuery = chatResponse.getResult().getOutput().getText();
  1. The LLM's output is extracted as a string. This is the dynamically generated Elasticsearch DSL query.
Map<String, Object> response = elasticsearchService.search(elasticQuery);
return ElasticMapper.mapToOrderDTO(response);
  1. The generated query is executed via elasticsearchService.search().

  2. The raw search results are then mapped to OrderDTO objects for use in the API.


Why This Matters

This pattern allows your application to understand and interpret human-friendly input without rigid UI constraints or static filters. It unlocks a much more intuitive experiences, especially valuable for user-facing search interfaces & dashboards.

It also abstracts the complexity of search syntax away from the user while still allowing them to perform advanced, flexible queries based on their intent.

Sequence Diagram

To better illustrate the flow, here’s a sequence diagram, showing the order of events in the application:


Wrapping it up

In this post, I’ve demonstrated how we can:

  • Set up a full-stack data ingestion pipeline & search stack using PostgreSQL, Kafka, and Elasticsearch

  • Use Spring AI and OpenAI's LLMs to handle natural language queries

  • Automatically translate plain English into structured Elasticsearch queries

Future Use Cases

Here are a few ideas for extending this:

  • Add user roles and access filters to contextualize results

  • Enable conversational memory with a chat-style interface allowing users to ask follow up questions

  • Use RAG (Retrieval-Augmented Generation) for more dynamic query understanding

  • Integrate Kibana dashboards that go along with the generated results


This foundation gives you a practical starting point for building intelligent, search-driven interfaces. If you're exploring ways to make data more accessible, this is a compelling approach worth trying!

Got ideas or questions? Drop them in the comments! 💬

0
Subscribe to my newsletter

Read articles from Arun Balchandran directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Arun Balchandran
Arun Balchandran