Analizando correos con Inteligencia Artificial
No hace falta que les cuente que hubo casi 1,9 millones de ataques de phishing en el último año, con 877.536 sólo en el segundo trimestre de 2024. Las estafas de Business Email Compromise (BEC) también han aumentado, con un importe medio solicitado de 89.520 dólares por incidente. Nos ponemos manos a la obra con este mini proyecto, que a mi entender es util y divertido.
Arquitectura
El usuario envía un correo sospechoso a una casilla.
n8n revisa la casilla cada cierto tiempo.
Si encuentra un correo, lo envía para analizar.
La primera API que usamos es Virus Total.
Luego analizamos con Ollama y un modelo preparado para este propósito.
Le pedimos a Ollama que formatee el análisis y cree un HTML.
n8n envía el reporte al usuario que tenía la sospecha.
Aca les dejo el repositorio del proyecto, donde se encontraran con esta estructura.
En la carpeta app estará el siguiente Python que tiene estos tres métodos: /analyze
, /analyze_mail
, /format_text
.
Metodo Analyze Email
# Ruta para analizar el cuerpo del correo
@app.route('/analyze_email', methods=['POST'])
def analyze_email():
try:
# Extraer el contenido del correo desde el cuerpo de la solicitud POST
data = request.get_json() # Asegurarse de que se obtiene un JSON parseado
# Verificar si 'email_content' está en el JSON
email_content = data.get('email_content')
if not email_content:
return jsonify({"error": "El contenido del correo es requerido."}), 400
# Verificar si 'from' está en el JSON
email_from = data.get('from')
if not email_from:
return jsonify({"error": "El campo 'from' es requerido."}), 400
# Construir el mensaje a ser enviado al modelo
model_messages = [
{
"role": "system",
"content": "Analyze the provided email content and metadata to determine if it's a potential phishing attempt. Provide your analysis in a structured format matching the SimplePhishingAnalysis model. Important the response in HTML Format",
},
{
"role": "user",
"content": email_content, # Aquí se incluye el contenido del correo
}
]
# Llamada al modelo local de Ollama para analizar el correo
response = ollama.chat(
model=os.getenv('OLLAMA_MODEL', 'gemma2:9b-instruct-q4_K_M'), # Usar la variable de entorno para el modelo
messages=model_messages
)
# Como `ollama.chat` devuelve una cadena, la parseamos para agregar 'from'
analysis_result = {"result": response, "from": email_from}
return jsonify({"analysis_result": analysis_result})
except Exception as e:
return jsonify({"error": str(e)}), 500
Metodo Analyze Virus Total
# Endpoint para manejar el análisis de archivos
@app.route('/analyze', methods=['POST'])
def analyze_file():
try:
# Verificar si la clave API está cargada
if not API_KEY:
return jsonify({"error": "API Key not found"}), 400
# Verificar si se ha incluido un archivo en la solicitud
if 'file' not in request.files:
return jsonify({"error": "No file part in the request"}), 400
# Obtener el archivo desde la solicitud
file = request.files['file']
# Verificar si el archivo tiene un nombre
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
# Si el archivo existe, proceder con el análisis
if file:
# Guardar el archivo temporalmente para enviarlo a VirusTotal
temp_path = Path(f"/tmp/{file.filename}")
file.save(temp_path)
# Abrir el archivo en modo binario y enviarlo para análisis
with open(temp_path, 'rb') as f:
params = {'apikey': API_KEY}
files = {'file': (file.filename, f)}
response = requests.post(upload_url, files=files, params=params)
# Verificar si la subida fue exitosa
if response.status_code == 200:
result = response.json()
scan_id = result['scan_id']
# Esperar para que el análisis esté listo
time.sleep(10)
# Consultar el reporte utilizando el scan_id
report_params = {'apikey': API_KEY, 'resource': scan_id}
report_response = requests.get(report_url, params=report_params)
# Verificar si la recuperación del reporte fue exitosa
if report_response.status_code == 200:
report_result = report_response.json()
# Obtener el número de motores antivirus que marcaron el archivo como malicioso
positives = report_result.get('positives', 0)
total = report_result.get('total', 0)
# Construir el objeto resultado para retornarlo en formato JSON
file_result = {
"file_name": file.filename,
"scan_id": scan_id,
"positives": positives,
"total": total,
"is_malicious": positives > 0,
"permalink": report_result.get('permalink')
}
return jsonify(file_result), 200
else:
return jsonify({"error": "Error retrieving report from VirusTotal"}), 500
else:
return jsonify({"error": "Error uploading file to VirusTotal"}), 500
except Exception as e:
return jsonify({"error": str(e)}), 500
Metodo Format Text
# Endpoint para formatear texto a HTML
@app.route('/format_text', methods=['POST'])
def format_text():
try:
# Extraer el contenido del texto desde el cuerpo de la solicitud POST
data = request.get_json() # Asegurarse de que se obtiene un JSON parseado
# Verificar si 'text' está en el JSON
text = data.get('text')
if not text:
return jsonify({"error": "El texto es requerido."}), 400
# Verificar si 'from' está en el JSON
email_from = data.get('from')
if not email_from:
return jsonify({"error": "El campo 'from' es requerido."}), 400
# Construir el mensaje a ser enviado al modelo
model_messages = [
{
"role": "system",
"content": (
"Formatea el texto proporcionado en HTML. "
"Asegúrate de que la salida esté bien estructurada, visualmente atractiva, y en español. "
"El HTML debe estar organizado según la siguiente estructura:\n"
"- is_potential_phishing: booleano\n"
"- is_malicious: booleano\n"
"- phishing_probability: enum (BAJA, MEDIA, ALTA)\n"
"- suspicious_elements: lista de objetos (elemento, motivo)\n"
"- recommended_actions: lista de acciones recomendadas\n"
"- explanation: explicación"
)
},
{
"role": "user",
"content": text # Aquí se incluye el texto a formatear
}
]
# Llamada al modelo local de Ollama para formatear el texto
response = ollama.chat(
model=os.getenv('OLLAMA_MODEL', 'gemma2:9b-instruct-q4_K_M'), # Usar la variable de entorno para el modelo
messages=model_messages
)
# Limpiar el texto recibido eliminando '```html\n' al inicio y '```' al final
formatted_html = response.get('message', {}).get('content', "")
formatted_html = formatted_html.replace("```html\n", "").replace("```", "").strip()
# Incluir 'from' en el resultado JSON
result = {
"formatted_html": formatted_html,
"from": email_from # Agregar 'from' al resultado
}
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
Les dejo el Dockerfile para la creación del contenedor. Será importante que el archivo .env
tenga la API Key de Virus Total.
En la raíz encontrarán el docker-compose que tiene n8n. Una vez que lo ejecuten, podrán subir la configuración que dejé en config_n8n
.
docker-compose
para tener un solo archivo de ejecución.Workflow
El flujo de trabajo, en una primera instancia, verifica si el correo reportado tiene o no adjuntos. Si los tiene, realizará una iteración para determinar cuántos deben enviarse a analizar a Virus Total. Unifica las respuestas, tanto del cuerpo del correo como de los adjuntos, para luego crear la respuesta al usuario que tenía la duda. De caso contrario analizara el cuerpo y luego generara la respuesta.
Ollama
Vamos a utilizar Ollama, para ello lo descargamos. En mi caso voy a usar gemma2:9b-instruct-q4_K_M como modelo. ¿Por qué usaremos gemma? Por ser conocido por su gran rendimiento en relación con su tamaño, esa es la justificación.
Para instalar el modelo, ejecutamos ollama pull gemma2:9b-instruct-q4_K_M
.
Aunque la detección de phishing basada en LLM ofrece una gran capacidad de adaptación y comprensión contextual, pero no se puede confiar solo en ella. Para una seguridad completa, es esencial integrar este enfoque con los métodos de detección tradicionales. Las herramientas de análisis estático que señalan Indicadores de Compromiso (IoC) conocidos, como URL sospechosas o archivos adjuntos, siguen siendo componentes vitales de una estrategia de seguridad sólida.
Utilizamos la biblioteca Instructor con Pydantic para crear modelos de datos sólidos para nuestros análisis. Debemos verlo como plantillas para organizar los datos.
Al definir estos modelos por adelantado, nos aseguramos de que los resultados de nuestros análisis estén estructurados de forma coherente.
# Definición de modelos Pydantic para el análisis estructurado
class PhishingProbability(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class SuspiciousElement(BaseModel):
element: str
reason: str
class SimplePhishingAnalysis(BaseModel):
is_potential_phishing: bool
is_malicious: bool
phishing_probability: PhishingProbability
suspicious_elements: List[SuspiciousElement]
recommended_actions: List[str]
explanation: str
Para el prompt del sistema, utilizaremos una instrucción simple:
Analyze the provided email content and metadata to determine if it's a potential phishing attempt. Provide your analysis in a structured format matching the SimplePhishingAnalysis model. Important the response in HTML Format.
# Construir el mensaje a ser enviado al modelo
model_messages = [
{
"role": "system",
"content": "Analyze the provided email content and metadata to determine if it's a potential phishing attempt. Provide your analysis in a structured format matching the SimplePhishingAnalysis model. Important the response in HTML Format",
},
{
"role": "user",
"content": email_content, # Aquí se incluye el contenido del correo
}
]
# Llamada al modelo local de Ollama para analizar el correo
response = ollama.chat(
model=os.getenv('OLLAMA_MODEL', 'gemma2:9b-instruct-q4_K_M'), # Usar la variable de entorno para el modelo
messages=model_messages
)
Prueba de Concepto
Vamos a enviar este correo, para que sea analizado por nuestro Workflow.
Aca vemos las llamadas de nuestra API.
Aquí el análisis que creó nuestra LLM.
¡Uala! Ya tenemos el reporte para el análisis por parte del usuario. Queda bastante por mejorar y perfeccionar nuestro reporte, pero ya es un comienzo. Espero que les sea útil y disfruten modificándolo.
Referencias
https://medium.com/@theofoucher/leveraging-llms-for-phishing-email-detection-8e480dfd3bad
Subscribe to my newsletter
Read articles from Santiago Fernandez directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Santiago Fernandez
Santiago Fernandez
I have a bachelor's degree in Technology from the University of Palermo, a Master in Information Security from the University of Murcia and different certifications such as CISSP | CISM | CDPSE | CCSK | CSX | MCSA | SMAC™️ | DSOE | DEPC | CSFPC | CSFPC | 5x AWS Certified. He is currently CISO at Klar, a Mexican Fintech. He was fortunate to be awarded as CISO of the Year in Argentina in 2021 and was among the Top 100 CISO's in the World in 2022. A lover of new technologies, he has developed a career in DevSecOps and Cloud Security at Eko Party, the largest security conference in Latin America.