Restaurant Reservation Agent with Amazon Bedrock and AWS CDK

In this post I will document my first attempt to build an Amazon Bedrock Agent with AWS CDK. The agent uses Retrieval Augmented Generation (RAG) to query a knowledge base containing restaurant descriptions and can perform the action of making a reservation on your behalf. I generate synthetic data for the restaurant descriptions. All the code and data can be found in this github repo.

The data source

In order to use RAG, I need some data that the foundation model has not seen before. For that reason I generate synthetic data for restaurant descriptions.

The data is auto-generated by this script. Let me show you an example and then I will give you more information

PerfectJapanHouse is a restaurant with japanese cuisine
in North District serving sushi, ramen and tonkatsu.
Their signature dish is sashimi. 
The average price per person is $31. 
Customers have rated its food with 4 stars on average. 
The service has average rating of 3 stars.

I have generated 1000 restaurant descriptions with names made out of combinations of 3 works so that all of them have unique names. I have decided to concatenate the 3 words without spaces, in order to make the model’s life a bit easier. This way Perfect Japan House is less likely to be confused with Perfect Japan Garden.

There are 5 types of cuisines (greek, italian, mexican, indian, japanese) and 4 districts (North, South, East, West). Specific types of restaurants only exist in specific districts. Each restaurant has some dishes and a signature dish. Average prices are generated via a distribution specific to the district. Customer ratings are generated at random.

I have dumped the metadata of the restaurants in a json file. This can be inspected to check the accuracy of the agent, or you can even load it to a pandas data frame in order to query it. For example, if I ask the agent “Give me some recommendations for restaurants serving sushi”, do I get the right answers?

The Action

I am adding a very simple action that makes a reservation with three pieces of information

  1. Restaurant name

  2. Main guest name

  3. Number of persons

The action is storing the reservation in a DynamoDB table so that I can inspect the outcome.

The CDK Code

I am using AWS CDK in python.

The code I am presenting here is the absolute minimum to achieve the goal of interacting and testing an agent that uses the above data. The code is not meant to be polished code for production, it is just one long python file. You can read this post and put the pieces together, or you can look at the full code here. Also look at the github code for all the python imports.

I need to acknowledge that I used this post by Dirk Michel for inspiration whenever I was stuck. Thanks Dirk also for the implementation of the lambda function that creates the Open Search index.

Pick the models

I need to choose one foundation model for the agent and another for the knowledge base

agent_foundation_model_id = "amazon.nova-micro-v1:0"

knowledge_base_foundation_model_vector_dimension = 1536
knowledge_base_foundation_model_id = "amazon.titan-embed-text-v1"

Store data in S3

I create an S3 bucket and I upload the restaurant descriptions data

s3_bucket = s3.Bucket(
    self,
    "s3-bucket",
    bucket_name=f"{prefix}-{Aws.ACCOUNT_ID}",
    removal_policy=aws_cdk.RemovalPolicy.DESTROY,
    auto_delete_objects=True,
)

restaurant_descriptions_deployment = s3_deploy.BucketDeployment(
    self,
    "s3-deployment",
    sources=[
        s3_deploy.Source.asset(
            "./data/restaurants/",
        )
    ],
    destination_bucket=s3_bucket,
    prune=True,
    retain_on_delete=False,
    destination_key_prefix="restaurants/",
)

Create the knowledge base role

I create the IAM role for the knowledge base. It needs permissions to invoke the foundation model, to read the data from S3 and permissions to interact with the Open Search index that I will create next.

knowledge_base_role = iam.Role(
    self,
    "knowledge-base-role",
    role_name=f"{prefix}-knowledge-base-role",
    assumed_by=iam.PrincipalWithConditions(
        principal=iam.ServicePrincipal("bedrock.amazonaws.com"),
        conditions={
            "StringEquals": {"aws:SourceAccount": Aws.ACCOUNT_ID},
            "ArnLike": {
                "aws:SourceArn": f"arn:aws:bedrock:{Aws.REGION}:{Aws.ACCOUNT_ID}:knowledge-base/*"
            },
        },
    ),
)

embedding_model_arn = f"arn:aws:bedrock:{Aws.REGION}::foundation-model/{knowledge_base_foundation_model_id}"

knowledge_base_role.add_to_policy(
    iam.PolicyStatement(
        effect=iam.Effect.ALLOW,
        actions=["bedrock:InvokeModel"],
        resources=[embedding_model_arn],
    )
)

knowledge_base_role.add_to_policy(
    iam.PolicyStatement(
        effect=iam.Effect.ALLOW,
        actions=["s3:ListBucket", "s3:GetObject"],
        resources=[
            s3_bucket.bucket_arn,
            s3_bucket.arn_for_objects("restaurants/*"),
        ],
    )
)

knowledge_base_role.add_to_policy(
    iam.PolicyStatement(
        effect=iam.Effect.ALLOW,
        actions=["aoss:APIAccessAll"],
        resources=["*"],
    )
)

Create the Open Search Collection

I create the Open Search Collection (serverless Open Search). First I need to create the security policies. The policies are not linked to the collection in any way. It is the naming that associates them.

# The security policies '{collection_name}-security-policy' need to have maximum 31 characters
collection_name = prefix[:15]

open_search_network_security_policy = aoss.CfnSecurityPolicy(
    self,
    "open-search-network-security-policy",
    # Specific naming convention
    name=f"{collection_name}-security-policy",
    type="network",
    policy=json.dumps(
        [
            {
                "Rules": [
                    {
                        "Resource": [f"collection/{collection_name}"],
                        "ResourceType": "dashboard",
                    },
                    {
                        "Resource": [f"collection/{collection_name}"],
                        "ResourceType": "collection",
                    },
                ],
                "AllowFromPublic": True,
            }
        ],
        indent=2,
    ),
)

open_search_encryption_security_policy = aoss.CfnSecurityPolicy(
    self,
    "open-search-encryption-security-policy",
    # Specific naming convention
    name=f"{collection_name}-security-policy",
    type="encryption",
    policy=json.dumps(
        {
            "Rules": [
                {
                    "Resource": [f"collection/{collection_name}"],
                    "ResourceType": "collection",
                }
            ],
            "AWSOwnedKey": True,
        },
        indent=2,
    ),
)

open_search_collection = aoss.CfnCollection(
    self,
    "open-search-serverless-collection",
    name=collection_name,
    type="VECTORSEARCH",
)

open_search_collection.add_dependency(open_search_encryption_security_policy)
open_search_collection.add_dependency(open_search_network_security_policy)

Create the Open Search Index

I need to create an index were the documents (restaurant descriptions) will be indexed. O need to define very specific fields and then use the same names in the knowledge base definition. I use a TriggerFunction to run a lambda function during deployment of our stack.


vector_index_name = "restaurant-descriptions-vector-index"

vector_index_metadata_field = "AMAZON_BEDROCK_METADATA"
vector_index_text_field = "AMAZON_BEDROCK_TEXT"
vector_index_vector_field = "VECTOR_FIELD"

trigger_function_runtime = _lambda.Runtime.PYTHON_3_12
create_index_trigger_function = triggers.TriggerFunction(
    self,
    "trigger-create-vector-index-lambda",
    runtime=trigger_function_runtime,
    code=_lambda.Code.from_asset(
        "./assets/create_aoss_index_lambda/",
        bundling=aws_cdk.BundlingOptions(
            # NOTE: for this to work an extra step of logging into public ECR is required
            image=trigger_function_runtime.bundling_image,
            command=[
                "bash",
                "-c",
                "pip install --no-cache -r requirements.txt -t /asset-output && cp -au . /asset-output",
            ],
        ),
    ),
    handler="handler.main",
    timeout=Duration.seconds(180),
    environment={
        "COLLECTION_ENDPOINT": open_search_collection.attr_collection_endpoint,
        "VECTOR_INDEX_NAME": vector_index_name,
        "METADATA_FIELD": vector_index_metadata_field,
        "TEXT_FIELD": vector_index_text_field,
        "VECTOR_FIELD": vector_index_vector_field,
        "VECTOR_DIMENSION": str(
            knowledge_base_foundation_model_vector_dimension
        ),
    },
    execute_after=[open_search_collection],
    initial_policy=[
        iam.PolicyStatement(
            effect=iam.Effect.ALLOW,
            actions=[
                "aoss:APIAccessAll",
            ],
            resources=[open_search_collection.attr_arn],
        )
    ],
)

The actual lambda code can be found here. The code above relies on docker to install additional libraries defined in the requirements.txt. For that reason you need to be logged in to public ECR with aws ecr-public get-login-password --region us-east-1 | docker login --username AWS --password-stdin public.ecr.aws when you deploy the stack.

Create the Open Search Access Policy


open_search_access_policy = aoss.CfnAccessPolicy(
    self,
    "open-search-access-policy",
    # Specific naming convention
    name=f"{collection_name}-policy",
    type="data",
    policy=json.dumps(
        [
            {
                "Rules": [
                    {
                        "Resource": [f"collection/{collection_name}"],
                        "Permission": [
                            "aoss:CreateCollectionItems",
                            "aoss:DeleteCollectionItems",
                            "aoss:UpdateCollectionItems",
                            "aoss:DescribeCollectionItems",
                        ],
                        "ResourceType": "collection",
                    },
                    {
                        "Resource": [f"index/{collection_name}/*"],
                        "Permission": [
                            "aoss:CreateIndex",
                            "aoss:DeleteIndex",
                            "aoss:UpdateIndex",
                            "aoss:DescribeIndex",
                            "aoss:ReadDocument",
                            "aoss:WriteDocument",
                        ],
                        "ResourceType": "index",
                    },
                ],
                "Principal": [
                    knowledge_base_role.role_arn,
                    create_index_trigger_function.role.role_arn,
                ],
                "Description": "data-access-rule",
            }
        ],
        indent=2,
    ),
)
create_index_trigger_function.execute_after(open_search_access_policy)

Create the Knowledge Base

Now it is time to define the knowledge base.

restaurant_descriptions_knowledge_base = bedrock.CfnKnowledgeBase(
    self,
    "knowledge-base-restaurant-descriptions",
    name=f"{prefix}-descriptions-knowledge-base",
    role_arn=knowledge_base_role.role_arn,
    knowledge_base_configuration=bedrock.CfnKnowledgeBase.KnowledgeBaseConfigurationProperty(
        type="VECTOR",
        vector_knowledge_base_configuration=bedrock.CfnKnowledgeBase.VectorKnowledgeBaseConfigurationProperty(
            embedding_model_arn=embedding_model_arn,
        ),
    ),
    storage_configuration=bedrock.CfnKnowledgeBase.StorageConfigurationProperty(
        type="OPENSEARCH_SERVERLESS",
        opensearch_serverless_configuration=bedrock.CfnKnowledgeBase.OpenSearchServerlessConfigurationProperty(
            collection_arn=open_search_collection.attr_arn,
            field_mapping=bedrock.CfnKnowledgeBase.OpenSearchServerlessFieldMappingProperty(
                metadata_field=vector_index_metadata_field,
                text_field=vector_index_text_field,
                vector_field=vector_index_vector_field,
            ),
            vector_index_name=vector_index_name,
        ),
    ),
)
restaurant_descriptions_knowledge_base.add_dependency(open_search_collection)
create_index_trigger_function.execute_before(
    restaurant_descriptions_knowledge_base
)

Create the Data Source

This is how I define the data source


restaurant_descriptions_data_source = bedrock.CfnDataSource(
    self,
    "knowledge-base-data-source-restaurant-descriptions",
    name=f"{prefix}-data-source",
    knowledge_base_id=restaurant_descriptions_knowledge_base.attr_knowledge_base_id,
    # We will delete the collection anyway.
    # If we do not RETAIN the cloudformation cannot be deleted smoothly.
    data_deletion_policy="RETAIN",
    data_source_configuration=bedrock.CfnDataSource.DataSourceConfigurationProperty(
        s3_configuration=bedrock.CfnDataSource.S3DataSourceConfigurationProperty(
            bucket_arn=s3_bucket.bucket_arn,
            inclusion_prefixes=["restaurants/descriptions/"],
        ),
        type="S3",
    ),
    vector_ingestion_configuration=bedrock.CfnDataSource.VectorIngestionConfigurationProperty(
        chunking_configuration=bedrock.CfnDataSource.ChunkingConfigurationProperty(
            chunking_strategy="FIXED_SIZE",
            fixed_size_chunking_configuration=bedrock.CfnDataSource.FixedSizeChunkingConfigurationProperty(
                max_tokens=300, overlap_percentage=20
            ),
        )
    ),
)
restaurant_descriptions_data_source.add_dependency(
    restaurant_descriptions_knowledge_base
)
restaurant_descriptions_data_source.node.add_dependency(
    restaurant_descriptions_deployment
)

After the code above has been deployed, if you log into the AWS Console you will notice that the data source needs to be synced. I can sync it using an AwsSdkCall

sync_data_source.AwsCustomResource(
    self,
    "sync-data-source",
    on_create=cr.AwsSdkCall(
        service="bedrock-agent",
        action="startIngestionJob",
        parameters={
            "dataSourceId": restaurant_descriptions_data_source.attr_data_source_id,
            "knowledgeBaseId": restaurant_descriptions_knowledge_base.attr_knowledge_base_id,
        },
        physical_resource_id=cr.PhysicalResourceId.of("Parameter.ARN"),
    ),
    policy=cr.AwsCustomResourcePolicy.from_sdk_calls(
        resources=cr.AwsCustomResourcePolicy.ANY_RESOURCE
    ),
)

sync_data_source.grant_principal.add_to_principal_policy(
    iam.PolicyStatement(
        effect=iam.Effect.ALLOW,
        actions=[
            "bedrock:StartIngestionJob",
            "iam:CreateServiceLinkedRole",
            "iam:PassRole",
        ],
        resources=["*"],
    )
)

Create reservations Table

Next I am creatign a DynamoDB table were reservations will be stored. This is so that I have some kind of storage where I can inspect the outcome of the agent’s action.

reservations_table = dynamodb.TableV2(
    self,
    f"{prefix}-reservations",
    partition_key=dynamodb.Attribute(
        name="restaurant_name", type=dynamodb.AttributeType.STRING
    ),
    sort_key=dynamodb.Attribute(
        name="main_guest_name", type=dynamodb.AttributeType.STRING
    ),
    removal_policy=aws_cdk.RemovalPolicy.DESTROY,
)

Create the reservations lambda function

First I define the role for the lambda

reservations_lambda_role = iam.Role(
    self,
    "reservations-lambda-role",
    role_name=f"{prefix}-reservations-lambda-role",
    assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"),
    managed_policies=[
        iam.ManagedPolicy.from_aws_managed_policy_name(
            "service-role/AWSLambdaBasicExecutionRole"
        )
    ],
)

reservations_lambda_role.add_to_policy(
    iam.PolicyStatement(
        effect=iam.Effect.ALLOW,
        actions=[
            "dynamodb:BatchGetItem",
            "dynamodb:BatchWriteItem",
            "dynamodb:ConditionCheckItem",
            "dynamodb:PutItem",
            "dynamodb:DescribeTable",
            "dynamodb:DeleteItem",
            "dynamodb:GetItem",
            "dynamodb:Scan",
            "dynamodb:Query",
            "dynamodb:UpdateItem",
        ],
        resources=[reservations_table.table_arn],
    )
)

and then the actual function

reservations_lambda = _lambda.Function(
    self,
    "reservations-lambda",
    runtime=_lambda.Runtime.PYTHON_3_12,
    handler="handler.main",
    code=_lambda.Code.from_asset("./assets/reservations_lambda/"),
    role=reservations_lambda_role,
    description="Lambda function for Bedrock Agent Actions related to reservations",
    environment={"DYNAMODB_TABLE_NAME": reservations_table.table_name},
)

The code of the handler is the following

import os
import boto3
import json

DYNAMODB_TABLE_NAME = os.environ["DYNAMODB_TABLE_NAME"]

dynamodb_client = boto3.client("dynamodb")


def _get_parameter(event, param_name):
    return next(p for p in event["parameters"] if p["name"] == param_name)["value"]


def main(event, context):

    print(json.dumps(event, indent=4))

    restaurant_name = _get_parameter(event, "restaurant_name")
    main_guest_name = _get_parameter(event, "main_guest_name")
    number_of_persons = _get_parameter(event, "number_of_persons")

    dynamodb_client.put_item(
        TableName=DYNAMODB_TABLE_NAME,
        Item={
            "restaurant_name": {"S": restaurant_name},
            "main_guest_name": {"S": main_guest_name},
            "number_of_persons": {"N": number_of_persons},
        },
    )

    return {
        "messageVersion": "1.0",
        "response": {
            "actionGroup": event["actionGroup"],
            "function": event["function"],
            "functionResponse": {
                "responseBody": {"TEXT": {"body": "Reservation was made successfully"}}
            },
        },
        "sessionAttributes": event["sessionAttributes"],
        "promptSessionAttributes": event["promptSessionAttributes"],
    }

The agent IAM role

The agent needs to be able to invoke the foundation model and use the knowledge base. It also needs to call the reservations lambda function, but for that I need a resource-based policy on the lambda function.

agent_role = iam.Role(
    self,
    "agent-role",
    role_name=f"{prefix}-agent-role",
    assumed_by=iam.PrincipalWithConditions(
        principal=iam.ServicePrincipal("bedrock.amazonaws.com"),
        conditions={
            "StringEquals": {"aws:SourceAccount": Aws.ACCOUNT_ID},
            "ArnLike": {
                "aws:SourceArn": f"arn:aws:bedrock:{Aws.REGION}:{Aws.ACCOUNT_ID}:agent/*"
            },
        },
    ),
)

agent_role.add_to_policy(
    iam.PolicyStatement(
        effect=iam.Effect.ALLOW,
        actions=["bedrock:InvokeModel"],
        resources=[
            f"arn:aws:bedrock:{Aws.REGION}::foundation-model/{agent_foundation_model_id}"
        ],
    )
)
agent_role.add_to_policy(
    iam.PolicyStatement(
        effect=iam.Effect.ALLOW,
        actions=["bedrock:Retrieve"],
        resources=[
            restaurant_descriptions_knowledge_base.attr_knowledge_base_arn
        ],
    )
)

Creating the Agent

Finally it is time to put all the pieces together and create the agent. Thankfully, the agent has a auto_prepare=True so I do not need to make an SDK call to prepare it. For the action, I chose to define the schema with a function schema.

agent = bedrock.CfnAgent(
    self,
    "ai-agent",
    agent_name=f"{prefix}-agent",
    foundation_model=agent_foundation_model_id,
    idle_session_ttl_in_seconds=600,
    instruction=(
        "You are an agent that helps me to find the right restaurant and then make a reservation. "
        "You are polite, patient and accurate. Your answers are short and to the point."
    ),
    agent_resource_role_arn=agent_role.role_arn,
    auto_prepare=True,
    knowledge_bases=[
        bedrock.CfnAgent.AgentKnowledgeBaseProperty(
            description=(
                "Restaurant descriptions with district, cuisine, dishes and signature dish."
                "Includes average price and customer scores."
                "1 star is the lowest score and 5 stars is the highest."
            ),
            knowledge_base_id=restaurant_descriptions_knowledge_base.attr_knowledge_base_id,
            knowledge_base_state="ENABLED",
        )
    ],
    action_groups=[
        bedrock.CfnAgent.AgentActionGroupProperty(
            action_group_name="MakeRestaurantReservation",
            description="Make a restaurant reservation",
            action_group_executor=bedrock.CfnAgent.ActionGroupExecutorProperty(
                lambda_=reservations_lambda.function_arn
            ),
            function_schema=bedrock.CfnAgent.FunctionSchemaProperty(
                functions=[
                    bedrock.CfnAgent.FunctionProperty(
                        name="make_restaurant_reservation",
                        parameters={
                            "restaurant_name": bedrock.CfnAgent.ParameterDetailProperty(
                                type="string",
                                description="the name of the restaurant to be reserved",
                                required=True,
                            ),
                            "main_guest_name": bedrock.CfnAgent.ParameterDetailProperty(
                                type="string",
                                description="the name of the person making the reservation",
                                required=True,
                            ),
                            "number_of_persons": bedrock.CfnAgent.ParameterDetailProperty(
                                type="integer",
                                description="number of persons for the reservation. must be positive number.",
                                required=True,
                            ),
                        },
                    )
                ]
            ),
            skip_resource_in_use_check_on_delete=True,
        )
    ],
)

Allow the agent to make reservations

As last bit of permissions, I add to the lambda policy a statement that allows the Bedrock service to invoke the lambda function on behalf of the agent.

reservations_lambda.add_permission(
    "allow-invoke-bedrock-agent",
    principal=iam.ServicePrincipal("bedrock.amazonaws.com"),
    action="lambda:InvokeFunction",
    source_arn=agent.attr_agent_arn,
)

Deploy

At this point I deploy my stack and I have a draft version of the agent to test. I can interact with the agent in the Test Window in the management console using the TestAlias. For a production agent an alias will need to be created to deploy the agent.

Demo

The best way to close this blog post is with a tiny simple demo of what the agent can now do.

At this stage, I check the DynamoDB table to see the reservation, and it is done correctly!!!

0
Subscribe to my newsletter

Read articles from Panagiotis Katsaroumpas, PhD directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Panagiotis Katsaroumpas, PhD
Panagiotis Katsaroumpas, PhD