Building a Modular Log Analysis Pipeline in Python and Crew AI

Log analysis is critical for monitoring, troubleshooting, and understanding the health of applications yet, wrangling hundreds of log files can quickly become a daunting, error-prone task. As applications scale and systems become distributed, a streamlined, automated approach to log analysis is not just helpful, but essential.
In this blog post, I’ll walk you through the design and implementation of a modular log analysis pipeline in Python. The goal: automatically read logs from a directory, correlate entries by timestamp, and detect errors and exceptions—all in a clean, extensible manner.
Why Build a Log Analysis Pipeline?
Manual log review is tedious and error-prone.
Distributed systems produce massive, fragmented logs.
Quickly surfacing root causes is vital for uptime and user trust.
A good log analysis tool should be:
Automated: No manual searching or copying.
Extensible: Easy to add new analysis, formats, or integrations.
Clear: Output actionable, human-readable insights.
Architecture Overview
The pipeline is composed of three main agents:
LogReaderAgent – Reads all
.log
files from a directory.CorrelationAgent – Groups log entries by timestamp for contextual analysis.
FailureDetectionAgent – Detects and summarizes error or exception events.
The main script orchestrates these agents, making the tool easy to run and maintain.
Diagram
flowchart TD
A[Log Directory] --> B(LogReaderAgent)
B --> C(CorrelationAgent)
C --> D(FailureDetectionAgent)
D --> E[Failure Report]
Implementation
1. Reading Log Files
The LogReaderAgent
scans a directory and reads every .log
file:
import glob
import os
class LogReaderAgent:
def read_logs(self, logs_dir):
logs = []
log_files = glob.glob(os.path.join(logs_dir, "*.log"))
for path in log_files:
with open(path, encoding="utf-8") as f:
logs.extend([line.strip() for line in f if line.strip()])
return logs
2. Correlating Log Entries
The CorrelationAgent
groups logs by their timestamp (assuming a standard format at the start of each line):
from collections import defaultdict
import re
class CorrelationAgent:
def correlate(self, logs):
events = defaultdict(list)
for log in logs:
match = re.match(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})', log)
if match:
ts = match.group(1)
events[ts].append(log)
else:
events["NO_TIMESTAMP"].append(log)
return dict(events)
3. Detecting Failures
The FailureDetectionAgent
hunts for lines containing "ERROR"
or "Exception"
:
class FailureDetectionAgent:
def detect_failures(self, correlated_events):
failure_report = []
for ts, logs in correlated_events.items():
for log in logs:
if "ERROR" in log or "Exception" in log:
failure_report.append(f"[{ts}] {log}")
if not failure_report:
return "No failures detected."
summary = "Failures detected:\n" + "\n".join(failure_report)
return summary
4. Orchestrating the Pipeline
The main.py
script ties everything together:
if __name__ == "__main__":
logs_dir = "logs" # Path to your log directory
reader = LogReaderAgent()
logs = reader.read_logs(logs_dir)
if not logs:
print("No logs found.")
exit()
correlator = CorrelationAgent()
correlated = correlator.correlate(logs)
detector = FailureDetectionAgent()
report = detector.detect_failures(correlated)
print(report)
Running the Tool
Install prerequisites (Python 3.7+, crewai if using agent base classes):
pip install crewai
Place your log files in a directory named
logs/
.Run the analysis:
python main.py
Output Example
Failures detected:
[2025-05-20 10:27:00] ERROR: Database connection failed
[2025-05-20 10:27:01] Exception: Timeout occurred in module X
...
Extending the Pipeline
Custom error patterns: Tweak or expand the detection logic.
Visualization: Pipe output to a dashboard or Slack.
Support more formats: Add JSON log parsing or multi-line events.
Conclusion
This log analysis pipeline automates and accelerates one of the most repetitive parts of debugging and monitoring: finding the signal in the noise of log files. The modular design means you can add features and scale the tool as your needs grow.
Feel free to fork, adapt, and contribute!
🔗 [https://github.com/aditya-khadanga/ai-agent-crewai]
Happy debugging! 🐍🛠️
#Python #LogAnalysis #DevOps #Automation #OpenSource #Observability
Subscribe to my newsletter
Read articles from Aditya Khadanga directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Aditya Khadanga
Aditya Khadanga
A DevOps practitioner dedicated to sharing practical knowledge. Expect in-depth tutorials and clear explanations of DevOps concepts, from fundamentals to advanced techniques. Join me on this journey of continuous learning and improvement!