Using LangChain to Chat with AWS VPC Flow Logs


Introducing the Use Case
Querying logs using natural language is incredibly powerful and can save a tremendous amount of time. Imagine being able to "chat" with your logs and ask questions like:
Who was the last person to connect to my server?
What connections originated from a specific source IP?
Show me all connections from the past five minutes.
Are there any unencrypted protocols (e.g., Telnet, HTTP) in my logs?
In this blog post, we'll focus on analyzing AWS VPC Flow Logs, but the same approach can be applied to any other type of logs.
Quick Introduction to Langchain
This is not an introductory post on LangChain, but rather an explanation of how it fits within our specific use case.
LangChain is a framework designed to simplify the development of applications that integrate large language models (LLMs) with external data sources and services.
At its core, LangChain uses the concept of a "chain"—a sequence of operations connected together to process input and generate a result. Each step in the chain can perform different actions, such as querying a database, calling an API, or processing text. The output of one step seamlessly feeds into the next, ensuring a structured and efficient flow of information.
chain = prompt_template | model | StrOutputParser()
In LangChain, a prompt template defines a structured input for a language model, making it both reusable and customizable. For example, you can create a template that instructs the model to summarize text in a specific format. By using this template, you can pass different text inputs while maintaining a consistent structure, ensuring reliable and varied responses.
It's important to note that the formatted prompt generated from the template is what ultimately gets passed to the Large Language Model (LLM) for processing.
# Define the prompt template
chat_prompt_template = ChatPromptTemplate.from_messages([
("system", "You are an assistant that summarizes texts. Please help me summarize the following text."),
("human", "{text}")
])
# Use the template with a dynamic input
formatted_prompt = chat_prompt_template.format(text="LangChain simplifies the use of language models in applications.")
print(formatted_prompt)
This code initializes a ChatGPT model (GPT-4) from OpenAI, making it ready for use in a LangChain application for conversational AI tasks—hence the name ChatOpenAI.
In simple terms, this works just like interacting with ChatGPT via Python, where the formatted prompt from the previous step serves as input to the model.
One key advantage of LangChain is its flexibility—you can seamlessly switch between different models, as illustrated in the figure below.
model = ChatOpenAI(model="gpt-4o")
#Only if you decide to move to HuggingFace
model = HuggingFaceHub(repo_id="gpt-4o")
Finally, StrOutputParser is the last element in our chain. It processes the model’s response, formatting it in a way that is suitable for the user or the application receiving it as input.
Solution High Level Overview
Now that we've covered the basics of LangChain, let's explore how it works in the context of chatting with AWS VPC Flow Logs and extracting meaningful insights from network traffic.
VPC Flow Logs capture traffic metadata—including Source IP, Destination IP, Source Port, Destination Port, and Action—for resources within a VPC. These logs are commonly used for troubleshooting, security analysis, and compliance monitoring.
Below is a sample of VPC Flow Logs stored in CloudWatch Logs. As you can see, manually extracting relevant insights from this raw data can feel like searching for a needle in a haystack. This is where LangChain comes in, enabling a more intuitive, conversational way to analyze and interpret these logs.
The get_log_events function retrieves VPC Flow Logs from AWS CloudWatch.
def get_log_events(log_group: str, limit: int = 50):
"""Fetches VPC Flow Logs from AWS CloudWatch without needing a log stream."""
try:
response = logs_client.filter_log_events(
logGroupName=log_group, limit=limit
)
return response["events"]
except Exception as e:
logging.error(f"Error fetching logs: {e}")
return []
This function converts the log event timestamps (which are in milliseconds) into a human-readable UTC format, making it easier to understand when each event occurred.
def convert_timestamp_to_human_readable(events):
"""Converts timestamps in milliseconds to a human-readable UTC format."""
return [
{
**event,
"timestamp": datetime.fromtimestamp(event["timestamp"] / 1000, timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
),
}
for event in events
]
The chain below consists of several key components:
Prompt – We use a system message to set the context or define the purpose, ensuring the model understands how to interpret the VPC Flow Logs.
Human Message – This adds the relevant query, specifying what information we want to extract.
Model – We use an OpenAI model to process the input and generate responses.
Output Parsing – The String Output Parser formats the model’s response, making it easier to read and interpret.
By connecting these components, the chain enables a seamless workflow where users can query VPC Flow Logs in natural language and receive structured, insightful responses.
prompt_template = ChatPromptTemplate.from_messages([
("system", """You are an AI assistant analyzing VPC Flow Logs. The user will ask questions on the logs, and you need to answer accordingly.
Answer only with the extracted information in this format if there is a matching connection or connections:
timestamp: <timestamp>
Source IP: <srcaddr>
Destination IP: <dstaddr>
Source Port: <srcport>
Destination Port: <dstport>
Protocol: <protocol>
Action: <action>
If no matching connections are found, respond with: "There are no matching connections."
All timestamps are in UTC.
VPC Flow Logs:
{logs}"""), # Removed the extra closing parenthesis here
("human", "{user_query}") # Corrected formatting
])
model = ChatOpenAI(model="gpt-4o")
# Create the combined chain using LangChain Expression Language (LCEL)
chain = prompt_template | model | StrOutputParser()
In this section, we:
Retrieve logs by calling the
get_log_events
function.Convert timestamps into a human-readable format for better clarity.
Define a set of queries and pass each one to the LangChain pipeline, allowing us to extract meaningful insights from the logs.
This process transforms raw VPC Flow Logs into actionable intelligence using natural language queries.
events = get_log_events(LOG_GROUP_NAME)
if not events:
logging.warning("No logs retrieved!")
sys.exit("No logs retrieved. Exiting the program.")
# Convert timestamps in logs
converted_events = convert_timestamp_to_human_readable(events)
#Sample User Queries
user_queries = [
"Please show the connections with Protocol equals 6 and Destination Port equals 22. Do not return any other connections.",
"What are the connections from Source IP X.X.X.X?",
"Show me all the connections",
"What is the last source IP address that connected last to my server with address 10.1.9.108?" ,
"Looking at all the flows, can you please list the destination ports that have the action ACCEPT? Only list the destination ports, not the full details."
"Can you figure out if there are any clear text (unencrypted protocols) such as HTTP or Telnet being used?"
]
for user_query in user_queries:
result = chain.invoke({"logs": converted_events, "user_query": user_query})
print(f"Answer for the question: {user_query}\n")
print(result)
print("\n" + "-"*80 + "\n")
Finally, here are some example outputs for the different queries. Please note that these have been truncated for brevity, but they effectively showcase how the model can extract and present relevant insights from the logs based on the questions asked.
Please show the connections with Protocol equals 6 and Destination Port equals 22. Do not return any other connections.
What are the connections from Source IP X.X.X.X?
What is the last source IP address that connected last to my server with address 10.1.9.108?
Can you figure out if there are any clear text (unencrypted protocols) such as HTTP or Telnet being used?
Code Availability
If you'd like to give this a try, you can find the code here.
Subscribe to my newsletter
Read articles from Karim El Jamali directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Karim El Jamali
Karim El Jamali
Self-directed and driven technology professional with 15+ years of experience in designing & implementing IP networks. I had roles in Product Management, Solutions Engineering, Technical Account Management, and Technical Enablement.