Analizando correos con Inteligencia Artificial

No hace falta que les cuente que hubo casi 1,9 millones de ataques de phishing en el último año, con 877.536 sólo en el segundo trimestre de 2024. Las estafas de Business Email Compromise (BEC) también han aumentado, con un importe medio solicitado de 89.520 dólares por incidente. Nos ponemos manos a la obra con este mini proyecto, que a mi entender es util y divertido.

Arquitectura

  1. El usuario envía un correo sospechoso a una casilla.

  2. n8n revisa la casilla cada cierto tiempo.

  3. Si encuentra un correo, lo envía para analizar.

  4. La primera API que usamos es Virus Total.

  5. Luego analizamos con Ollama y un modelo preparado para este propósito.

  6. Le pedimos a Ollama que formatee el análisis y cree un HTML.

  7. n8n envía el reporte al usuario que tenía la sospecha.

Aca les dejo el repositorio del proyecto, donde se encontraran con esta estructura.

En la carpeta app estará el siguiente Python que tiene estos tres métodos: /analyze, /analyze_mail, /format_text.

Metodo Analyze Email

# Ruta para analizar el cuerpo del correo
@app.route('/analyze_email', methods=['POST'])
def analyze_email():
    try:
        # Extraer el contenido del correo desde el cuerpo de la solicitud POST
        data = request.get_json()  # Asegurarse de que se obtiene un JSON parseado

        # Verificar si 'email_content' está en el JSON
        email_content = data.get('email_content')
        if not email_content:
            return jsonify({"error": "El contenido del correo es requerido."}), 400

        # Verificar si 'from' está en el JSON
        email_from = data.get('from')
        if not email_from:
            return jsonify({"error": "El campo 'from' es requerido."}), 400

        # Construir el mensaje a ser enviado al modelo
        model_messages = [
            {
                "role": "system",
                "content": "Analyze the provided email content and metadata to determine if it's a potential phishing attempt. Provide your analysis in a structured format matching the SimplePhishingAnalysis model. Important the response in HTML Format",
            },
            {
                "role": "user",
                "content": email_content,  # Aquí se incluye el contenido del correo
            }
        ]

        # Llamada al modelo local de Ollama para analizar el correo
        response = ollama.chat(
            model=os.getenv('OLLAMA_MODEL', 'gemma2:9b-instruct-q4_K_M'),  # Usar la variable de entorno para el modelo
            messages=model_messages
        )

        # Como `ollama.chat` devuelve una cadena, la parseamos para agregar 'from'
        analysis_result = {"result": response, "from": email_from}

        return jsonify({"analysis_result": analysis_result})
    except Exception as e:
        return jsonify({"error": str(e)}), 500

Metodo Analyze Virus Total

# Endpoint para manejar el análisis de archivos
@app.route('/analyze', methods=['POST'])
def analyze_file():
    try:
        # Verificar si la clave API está cargada
        if not API_KEY:
            return jsonify({"error": "API Key not found"}), 400

        # Verificar si se ha incluido un archivo en la solicitud
        if 'file' not in request.files:
            return jsonify({"error": "No file part in the request"}), 400

        # Obtener el archivo desde la solicitud
        file = request.files['file']

        # Verificar si el archivo tiene un nombre
        if file.filename == '':
            return jsonify({"error": "No selected file"}), 400

        # Si el archivo existe, proceder con el análisis
        if file:
            # Guardar el archivo temporalmente para enviarlo a VirusTotal
            temp_path = Path(f"/tmp/{file.filename}")
            file.save(temp_path)

            # Abrir el archivo en modo binario y enviarlo para análisis
            with open(temp_path, 'rb') as f:
                params = {'apikey': API_KEY}
                files = {'file': (file.filename, f)}
                response = requests.post(upload_url, files=files, params=params)

            # Verificar si la subida fue exitosa
            if response.status_code == 200:
                result = response.json()
                scan_id = result['scan_id']

                # Esperar para que el análisis esté listo
                time.sleep(10)

                # Consultar el reporte utilizando el scan_id
                report_params = {'apikey': API_KEY, 'resource': scan_id}
                report_response = requests.get(report_url, params=report_params)

                # Verificar si la recuperación del reporte fue exitosa
                if report_response.status_code == 200:
                    report_result = report_response.json()

                    # Obtener el número de motores antivirus que marcaron el archivo como malicioso
                    positives = report_result.get('positives', 0)
                    total = report_result.get('total', 0)

                    # Construir el objeto resultado para retornarlo en formato JSON
                    file_result = {
                        "file_name": file.filename,
                        "scan_id": scan_id,
                        "positives": positives,
                        "total": total,
                        "is_malicious": positives > 0,
                        "permalink": report_result.get('permalink')
                    }

                    return jsonify(file_result), 200
                else:
                    return jsonify({"error": "Error retrieving report from VirusTotal"}), 500
            else:
                return jsonify({"error": "Error uploading file to VirusTotal"}), 500
    except Exception as e:
        return jsonify({"error": str(e)}), 500

Metodo Format Text

# Endpoint para formatear texto a HTML
@app.route('/format_text', methods=['POST'])
def format_text():
    try:
        # Extraer el contenido del texto desde el cuerpo de la solicitud POST
        data = request.get_json()  # Asegurarse de que se obtiene un JSON parseado

        # Verificar si 'text' está en el JSON
        text = data.get('text')
        if not text:
            return jsonify({"error": "El texto es requerido."}), 400

        # Verificar si 'from' está en el JSON
        email_from = data.get('from')
        if not email_from:
            return jsonify({"error": "El campo 'from' es requerido."}), 400

        # Construir el mensaje a ser enviado al modelo
        model_messages = [
            {
                "role": "system",
                "content": (
                    "Formatea el texto proporcionado en HTML. "
                    "Asegúrate de que la salida esté bien estructurada, visualmente atractiva, y en español. "
                    "El HTML debe estar organizado según la siguiente estructura:\n"
                    "- is_potential_phishing: booleano\n"
                    "- is_malicious: booleano\n"
                    "- phishing_probability: enum (BAJA, MEDIA, ALTA)\n"
                    "- suspicious_elements: lista de objetos (elemento, motivo)\n"
                    "- recommended_actions: lista de acciones recomendadas\n"
                    "- explanation: explicación"
                )
            },
            {
                "role": "user",
                "content": text  # Aquí se incluye el texto a formatear
            }
        ]

        # Llamada al modelo local de Ollama para formatear el texto
        response = ollama.chat(
            model=os.getenv('OLLAMA_MODEL', 'gemma2:9b-instruct-q4_K_M'),  # Usar la variable de entorno para el modelo
            messages=model_messages
        )

        # Limpiar el texto recibido eliminando '```html\n' al inicio y '```' al final
        formatted_html = response.get('message', {}).get('content', "")
        formatted_html = formatted_html.replace("```html\n", "").replace("```", "").strip()

        # Incluir 'from' en el resultado JSON
        result = {
            "formatted_html": formatted_html,
            "from": email_from  # Agregar 'from' al resultado
        }

        return jsonify(result)
    except Exception as e:
        return jsonify({"error": str(e)}), 500

Les dejo el Dockerfile para la creación del contenedor. Será importante que el archivo .env tenga la API Key de Virus Total.

En la raíz encontrarán el docker-compose que tiene n8n. Una vez que lo ejecuten, podrán subir la configuración que dejé en config_n8n.

💡
En mi caso, estoy ejecutando Ollama y la API localmente, pero no sería mala idea agregar ambos contenedores al docker-compose para tener un solo archivo de ejecución.

Workflow

El flujo de trabajo, en una primera instancia, verifica si el correo reportado tiene o no adjuntos. Si los tiene, realizará una iteración para determinar cuántos deben enviarse a analizar a Virus Total. Unifica las respuestas, tanto del cuerpo del correo como de los adjuntos, para luego crear la respuesta al usuario que tenía la duda. De caso contrario analizara el cuerpo y luego generara la respuesta.

Ollama

Vamos a utilizar Ollama, para ello lo descargamos. En mi caso voy a usar gemma2:9b-instruct-q4_K_M como modelo. ¿Por qué usaremos gemma? Por ser conocido por su gran rendimiento en relación con su tamaño, esa es la justificación.

Para instalar el modelo, ejecutamos ollama pull gemma2:9b-instruct-q4_K_M.

Aunque la detección de phishing basada en LLM ofrece una gran capacidad de adaptación y comprensión contextual, pero no se puede confiar solo en ella. Para una seguridad completa, es esencial integrar este enfoque con los métodos de detección tradicionales. Las herramientas de análisis estático que señalan Indicadores de Compromiso (IoC) conocidos, como URL sospechosas o archivos adjuntos, siguen siendo componentes vitales de una estrategia de seguridad sólida.

Utilizamos la biblioteca Instructor con Pydantic para crear modelos de datos sólidos para nuestros análisis. Debemos verlo como plantillas para organizar los datos.

Al definir estos modelos por adelantado, nos aseguramos de que los resultados de nuestros análisis estén estructurados de forma coherente.

# Definición de modelos Pydantic para el análisis estructurado
class PhishingProbability(str, Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"

class SuspiciousElement(BaseModel):
    element: str
    reason: str

class SimplePhishingAnalysis(BaseModel):
    is_potential_phishing: bool
    is_malicious: bool
    phishing_probability: PhishingProbability
    suspicious_elements: List[SuspiciousElement]
    recommended_actions: List[str]
    explanation: str

Para el prompt del sistema, utilizaremos una instrucción simple:

Analyze the provided email content and metadata to determine if it's a potential phishing attempt. Provide your analysis in a structured format matching the SimplePhishingAnalysis model. Important the response in HTML Format.

        # Construir el mensaje a ser enviado al modelo
        model_messages = [
            {
                "role": "system",
                "content": "Analyze the provided email content and metadata to determine if it's a potential phishing attempt. Provide your analysis in a structured format matching the SimplePhishingAnalysis model. Important the response in HTML Format",
            },
            {
                "role": "user",
                "content": email_content,  # Aquí se incluye el contenido del correo
            }
        ]

        # Llamada al modelo local de Ollama para analizar el correo
        response = ollama.chat(
            model=os.getenv('OLLAMA_MODEL', 'gemma2:9b-instruct-q4_K_M'),  # Usar la variable de entorno para el modelo
            messages=model_messages
        )

Prueba de Concepto

Vamos a enviar este correo, para que sea analizado por nuestro Workflow.

Aca vemos las llamadas de nuestra API.

Aquí el análisis que creó nuestra LLM.

¡Uala! Ya tenemos el reporte para el análisis por parte del usuario. Queda bastante por mejorar y perfeccionar nuestro reporte, pero ya es un comienzo. Espero que les sea útil y disfruten modificándolo.

Referencias

https://apwg.org/

https://medium.com/@theofoucher/leveraging-llms-for-phishing-email-detection-8e480dfd3bad

0
Subscribe to my newsletter

Read articles from Santiago Fernandez directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Santiago Fernandez
Santiago Fernandez

I have a bachelor's degree in Technology from the University of Palermo, a Master in Information Security from the University of Murcia and different certifications such as CISSP | CISM | CDPSE | CCSK | CSX | MCSA | SMAC™️ | DSOE | DEPC | CSFPC | CSFPC | 5x AWS Certified. He is currently CISO at Klar, a Mexican Fintech. He was fortunate to be awarded as CISO of the Year in Argentina in 2021 and was among the Top 100 CISO's in the World in 2022. A lover of new technologies, he has developed a career in DevSecOps and Cloud Security at Eko Party, the largest security conference in Latin America.