Deep Dive – The Python Connector (Part 3): Advanced Parsing & Multi-Table Routing

Topaz HurvitzTopaz Hurvitz
19 min read

(Author's Note: While the technical implementations described here are accurate, identifying details and specific scenarios have been modified for confidentiality. The core challenges and solutions reflect real-world experiences in building production-grade connectors.)*

Introduction: Taming the Log Zoo

In the previous posts, we established how to reliably retrieve logs from AWS S3 (Episode 3) and push data securely to Microsoft Sentinel's API (Episode 4). However, real-world security logs rarely arrive in a uniform format. Firewalls, VPNs, proxies, and custom applications all speak different dialects – CSV, JSON, key-value pairs, unstructured text, often compressed. Simply dumping this varied data into a single Sentinel table creates chaos.

This post tackles the critical processing stage: enhancing our Python connector to intelligently handle diverse log formats. We'll delve into techniques for:

  1. Detecting different log structures (e.g., JSON vs. CSV).

  2. Parsing data fields accurately based on the detected format.

  3. Normalizing key information (like timestamps, IPs, event types) into a consistent standard.

  4. Routing parsed logs to specific, appropriate custom tables within Microsoft Sentinel.

Mastering this stage is crucial for data integrity, effective threat correlation, and efficient analysis within Sentinel.

The Challenge: Why One Size Doesn't Fit All

Ingesting raw, unparsed logs from multiple sources directly into a generic Sentinel table leads to problems:

  • Data Integrity Issues: Inconsistent field names (source_ip vs. srcIP vs. client_address), varied timestamp formats, and missing fields make automated analysis unreliable.

  • Poor Correlation: It becomes difficult to correlate events across different log types if common fields aren't normalized (e.g., identifying the same user across VPN logs and application logs).

  • Inefficient Queries: Searching through a massive, mixed-log table is slower and more complex than querying smaller, type-specific tables.

  • Operational Overhead: Security analysts waste time manually cleaning, translating, and interpreting logs instead of focusing on threat hunting.

Solution: A Flexible Parsing and Routing Framework

To address this, we need a framework within our Python connector that can adapt to different log types before sending them to Sentinel.

Key Concepts:

  • Parsing: The process of analyzing raw log text (a string) and extracting structured data fields (like IP addresses, usernames, actions) based on the log's format (e.g., splitting a CSV line, interpreting a JSON object).

  • Normalization: Transforming extracted data into a standardized format. This includes converting timestamps to a common standard (like ISO 8601 UTC), validating and cleaning IP addresses, and mapping vendor-specific event names to a common taxonomy (e.g., 'login failed', 'auth failure', 'Login Attempt Failure' all map to AUTH_FAILURE).

  • Schema: The defined structure (field names and data types) of a table in Microsoft Sentinel (or any database). Our parsed data must conform to the target table's schema.

  • Multi-Table Ingestion: Sending different types of logs to different custom tables in Sentinel (e.g., Firewall_Logs_CL, VPN_Logs_CL, App_Activity_CL) instead of a single generic table. This improves query performance and data organization.

  • Routing: The logic that decides, based on the log type or source, which specific Sentinel table the processed log record should be sent to.

  • Heuristics: Rules of thumb or educated guesses used when absolute certainty isn't possible. Our format detection uses heuristics (e.g., "if it looks like JSON, treat it as JSON").

The Processing Workflow

Here’s a conceptual diagram of the data flow within this processing stage:

graph TD
    A["πŸ“¦ Raw Log Line (Bytes/String)"] --> B{"🧠 Detect Format (Heuristics)"}

    B -- "🟦 JSON?" --> C["πŸ“„ Parse JSON"]
    B -- "🟧 CSV?" --> D["πŸ“„ Parse CSV"]
    B -- "πŸ› οΈ Other/Custom?" --> E["πŸ” Apply Custom Parser (Regex/Split)"]

    C --> F{"πŸ” Normalize Fields"}
    D --> F
    E --> F

    F --> G{"πŸ“‚ Determine Log Type\n(from filename/prefix)"}
    G --> H{"πŸ“„ Get Sentinel Table Config"}
    H --> I{"βœ… Validate Parsed Data vs Schema"}

    I -- "βœ” Valid" --> J{"🚦 Route to Target Table"}
    I -- "❌ Invalid" --> K["⚠️ Log Error / Dead-Letter Queue"]

    J -- "πŸ›‘οΈ Firewall" --> L["πŸ—‚οΈ Firewall_CL"]
    J -- "πŸ”’ VPN" --> M["πŸ—‚οΈ VPN_CL"]
    J -- "πŸ“ Generic" --> N["πŸ—‚οΈ Generic_CL"]

    %% Styles
    style A fill:#e6f7ff,stroke:#333,stroke-width:1px
    style B fill:#fff2cc,stroke:#333,stroke-width:1px
    style C fill:#d9f2e6,stroke:#333,stroke-width:1px
    style D fill:#d9f2e6,stroke:#333,stroke-width:1px
    style E fill:#f9e0e0,stroke:#333,stroke-width:1px
    style F fill:#d9d9f3,stroke:#333,stroke-width:1px
    style G fill:#f0f5ff,stroke:#333,stroke-width:1px
    style H fill:#f0f5ff,stroke:#333,stroke-width:1px
    style I fill:#ffffcc,stroke:#333,stroke-width:1px
    style J fill:#e6ffe6,stroke:#333,stroke-width:1px
    style K fill:#ffe6e6,stroke:#333,stroke-width:1px
    style L fill:#ccd9ff,stroke:#333,stroke-width:1px
    style M fill:#ccd9ff,stroke:#333,stroke-width:1px
    style N fill:#ccd9ff,stroke:#333,stroke-width:1px

1. Foundational Parsing and Normalization

We start with a base class providing common utilities for format detection and basic field normalization.

import json
import re # Regular expression library
import logging
from typing import Any, Optional

class BaseLogParser:
    """
    Provides basic log format detection and field normalization utilities.
    Intended to be inherited by specific log type parsers.
    """
    def __init__(self):
        self.log = logging.getLogger(__name__)
        # Define supported formats this base class attempts to detect
        self.supported_formats = ['json', 'csv', 'custom']
        # Field mappings would typically be defined in inheriting classes
        self.field_mappings = {}
        self.log.info("BaseLogParser initialized.")

    def detect_format(self, sample_line: str) -> str:
        """
        Attempts to detect the log format using simple heuristics.
        NOTE: This is a basic heuristic approach and may not be accurate for all logs.
              More robust detection might involve checking multiple lines or using libraries.

        Args:
            sample_line (str): A single line of log data (as a string).

        Returns:
            str: The detected format ('json', 'csv', 'custom').
        """
        if not isinstance(sample_line, str) or not sample_line.strip():
            self.log.warning("Cannot detect format of empty or non-string input.")
            return 'custom' # Default to custom if input is unusable

        line = sample_line.strip()

        # Heuristic 1: Try parsing as JSON
        try:
            # Attempt to load the line as a JSON object. Success means it's likely JSON.
            json.loads(line)
            self.log.debug("Detected format as JSON.")
            return 'json'
        except json.JSONDecodeError:
            # If JSON parsing fails, continue to other checks.
            self.log.debug("Input is not valid JSON.")
            pass # Not JSON

        # Heuristic 2: Check for CSV characteristics (commas, lack of JSON chars)
        # This is very basic - assumes commas are delimiters and avoids lines with JSON brackets/braces.
        # A more robust CSV check would use the `csv` module's Sniffer class.
        if ',' in line and not re.search(r'[{}[\]]', line):
             # Contains commas and lacks typical JSON characters -> likely CSV
             self.log.debug("Detected format as CSV (heuristic).")
             return 'csv'

        # Heuristic 3: Default to 'custom' if not clearly JSON or CSV-like.
        self.log.debug("Format not identified as JSON or CSV, defaulting to 'custom'.")
        return 'custom'

    def normalize_field(self, field_name: str, field_value: Any) -> Optional[Any]:
        """
        Performs basic normalization on a field value.
        - Converts empty strings to None.
        - Removes ASCII control characters from strings.
        - Trims leading/trailing whitespace from strings.

        Args:
            field_name (str): The name of the field (for potential future logic).
            field_value (Any): The raw value extracted from the log.

        Returns:
            Optional[Any]: The normalized value, or None.
        """
        # Rule 1: Represent empty/missing values consistently as None.
        if field_value == '':
            return None

        # Rule 2: Clean string values
        if isinstance(field_value, str):
            # Remove ASCII control characters (like NUL, ETX) that can cause issues.
            # \x00-\x1F are control chars, \x7F is DEL.
            cleaned_value = re.sub(r'[\x00-\x1F\x7F]', '', field_value)
            # Normalize whitespace: replace multiple spaces with one, trim ends.
            normalized_value = ' '.join(cleaned_value.split())
            # Return None if the string becomes empty after cleaning.
            return normalized_value if normalized_value else None

        # Rule 3: Return other types (int, float, bool) as is for now.
        # More specific normalization (e.g., ensuring numeric types) can be added here
        # or in the LogFormatHandler class.
        return field_value

Summary: The BaseLogParser provides initial, heuristic-based format detection (detect_format) and essential field cleaning (normalize_field). It serves as a foundation upon which specific parsers for different log types can be built, inheriting these common utilities. The format detection is intentionally simple and might require refinement based on the specific logs encountered.

2. Handling Specific Data Types (Timestamps, IPs, Events)

Different log sources represent common data types like timestamps or event categories differently. A dedicated handler standardizes these.

import datetime
import ipaddress # Library for validating IP addresses
import logging
from typing import Optional, Any

class LogFormatHandler:
    """
    Provides methods for parsing and normalizing specific common data types
    found in various log formats (timestamps, IPs, event categories).
    """
    def __init__(self):
        self.log = logging.getLogger(__name__)
        # Define a list of expected timestamp formats (order matters - try most common first).
        # Add more formats as needed based on your log sources.
        self.timestamp_formats = [
            '%Y-%m-%dT%H:%M:%S.%fZ',      # ISO 8601 format with Z (UTC)
            '%Y-%m-%dT%H:%M:%SZ',         # ISO 8601 format without millis
            '%Y-%m-%dT%H:%M:%S.%f%z',     # ISO 8601 with timezone offset
            '%Y-%m-%d %H:%M:%S,%f',       # Common format with comma-millis
            '%b %d %Y %H:%M:%S',          # Syslog-like format (e.g., "Apr 14 2025 13:44:29")
            '%Y/%m/%d %H:%M:%S',          # Another common format
            '%m/%d/%Y %I:%M:%S %p',       # e.g., 04/14/2025 01:44:29 PM
            '%Y%m%d%H%M%S',               # Compact format
            '%s'                           # Unix epoch timestamp (seconds)
            # Add other formats encountered in your logs here
        ]
        # Example mapping for standardizing event categories. Expand significantly for real use.
        self.event_mappings = {
            # Authentication Success
            'login_successful': 'Authentication_Success', 'successful login': 'Authentication_Success',
            'auth ok': 'Authentication_Success', 'vpn connect': 'Authentication_Success',
            # Authentication Failure
            'login failed': 'Authentication_Failure', 'auth failure': 'Authentication_Failure',
            'vpn failed': 'Authentication_Failure', 'invalid credentials': 'Authentication_Failure',
            # Access Control
            'access denied': 'Access_Denied', 'firewall block': 'Access_Denied',
            'unauthorized': 'Access_Denied', 'connection denied': 'Access_Denied',
            'firewall permit': 'Access_Permitted', 'connection allowed': 'Access_Permitted',
            # Add many more based on CIM or other standards (Process events, Network events etc.)
        }
        self.log.info("LogFormatHandler initialized with common normalization rules.")


    def parse_timestamp(self, timestamp_str: str) -> Optional[datetime.datetime]:
        """
        Attempts to parse a timestamp string using a list of known formats.
        Returns a timezone-aware datetime object (UTC) if successful, otherwise None.

        Args:
            timestamp_str (str): The raw timestamp string from the log.

        Returns:
            Optional[datetime.datetime]: Timezone-aware datetime object (UTC) or None.
        """
        if not isinstance(timestamp_str, str) or not timestamp_str.strip():
            self.log.warning("Received empty or non-string timestamp_str.")
            return None

        cleaned_ts = timestamp_str.strip()

        for fmt in self.timestamp_formats:
            try:
                # Handle epoch format separately
                if fmt == '%s':
                    dt_naive = datetime.datetime.fromtimestamp(float(cleaned_ts), tz=datetime.timezone.utc)
                    return dt_naive # Already UTC

                # Parse using strptime
                dt_naive = datetime.datetime.strptime(cleaned_ts, fmt)

                # Make the datetime object timezone-aware (assume UTC if no timezone info)
                if dt_naive.tzinfo is None:
                     # Assume UTC if format doesn't include timezone info (like %z or Z)
                     # Or, based on knowledge of the source log, assume local time and convert
                     dt_aware = dt_naive.replace(tzinfo=datetime.timezone.utc)
                else:
                     # If timezone info is present, convert to UTC
                     dt_aware = dt_naive.astimezone(datetime.timezone.utc)

                self.log.debug(f"Successfully parsed timestamp '{cleaned_ts}' using format '{fmt}'. Result: {dt_aware.isoformat()}")
                return dt_aware # Return the first successful parse
            except (ValueError, TypeError):
                # ValueError if format doesn't match, TypeError if input isn't string/float for epoch
                self.log.debug(f"Timestamp '{cleaned_ts}' did not match format '{fmt}'.")
                continue # Try the next format

        self.log.warning(f"Unable to parse timestamp string '{timestamp_str}' using any known format.")
        return None # Return None if no format matches

    def clean_ip_address(self, ip_str: str) -> Optional[str]:
        """
        Validates and cleans an IPv4 or IPv6 address string.

        Args:
            ip_str (str): The potential IP address string.

        Returns:
            Optional[str]: The cleaned, valid IP address string, or None if invalid.
        """
        if not isinstance(ip_str, str) or not ip_str.strip():
            return None

        cleaned_ip = ip_str.strip()
        try:
            # Use the ipaddress library for robust validation (handles IPv4 and IPv6).
            ip = ipaddress.ip_address(cleaned_ip)
            # Return the canonical string representation of the IP.
            self.log.debug(f"Validated IP address: {str(ip)}")
            return str(ip)
        except ValueError:
            # The string is not a valid IP address.
            self.log.warning(f"Invalid IP address format encountered: '{ip_str}'.")
            return None

    def normalize_event_type(self, event_description: str) -> str:
        """
        Maps a raw event description string to a standardized category name.

        Args:
            event_description (str): The event description from the log.

        Returns:
            str: The standardized event category (e.g., 'Authentication_Success') or 'Unknown'.
        """
        if not isinstance(event_description, str) or not event_description.strip():
            return 'Unknown' # Default for empty input

        # Perform case-insensitive matching after basic cleaning.
        lookup_key = event_description.strip().lower()
        standard_event = self.event_mappings.get(lookup_key, 'Unknown')

        if standard_event == 'Unknown':
             self.log.debug(f"Event description '{event_description}' mapped to 'Unknown'. Consider updating event_mappings.")
        else:
             self.log.debug(f"Mapped event '{event_description}' to '{standard_event}'.")

        return standard_event

    # Add other normalization functions as needed (e.g., for MAC addresses, hostnames, URLs, severities)

Summary: The LogFormatHandler focuses on cleaning and standardizing common but varied data fields. It attempts to parse timestamps using multiple common formats (parse_timestamp), validates and cleans IP addresses (clean_ip_address), and maps vendor-specific event descriptions to a predefined standard taxonomy (normalize_event_type). This standardization is crucial for effective correlation and analysis in Sentinel.

3. Routing Logs to Specific Sentinel Tables

Different logs belong in different places. Firewall logs might go to Firewall_CL, VPN logs to VPN_CL, etc. The SentinelRouter manages this logic based on configuration.

graph TD
    A["Processed Log + Log Type - firewall"] --> B["SentinelRouter"]
    B --> C["Lookup firewall in table configs"]
    C --> D["Get Config: table = Custom_Firewall_CL, fields = [...]"]
    D --> E{"Validate Data vs Required Fields"}
    E -- OK --> F["Target Table: Custom_Firewall_CL"]
    E -- Missing Fields --> G["Log Error / Dead-Letter"]

    %% Styles
    style A fill:#e6f7ff,stroke:#333,stroke-width:1px
    style B fill:#f0f0f0,stroke:#333,stroke-width:1px
    style C fill:#f0f5ff,stroke:#333,stroke-width:1px
    style D fill:#ffffff,stroke:#333,stroke-width:1px
    style E fill:#ffffcc,stroke:#333,stroke-width:1px
    style F fill:#ccffcc,stroke:#333,stroke-width:1px
    style G fill:#ffe6e6,stroke:#333,stroke-width:1px
import logging
from typing import Dict, Any, Optional, List

class SentinelRouter:
    """
    Manages routing logic for sending processed logs to the correct
    Microsoft Sentinel custom table based on log type and configuration.
    Also handles schema validation before routing.
    """
    def __init__(self, table_configurations: Dict[str, Dict[str, Any]]):
        """
        Initializes the router with table configurations.

        Args:
            table_configurations (Dict): A dictionary where keys are internal log type names
                                         (e.g., 'firewall', 'vpn') and values are dictionaries
                                         containing table details like 'table_name', 'required_fields'.
                                         Example:
                                         {
                                             'firewall': {'table_name': 'Firewall_CL', 'required_fields': ['TimeGenerated', 'SourceIP']},
                                             'vpn': {'table_name': 'VPN_CL', 'required_fields': ['TimeGenerated', 'User']}
                                         }
        """
        self.log = logging.getLogger(__name__)
        if not isinstance(table_configurations, dict):
             raise TypeError("table_configurations must be a dictionary.")
        self.table_configs = table_configurations
        self.log.info(f"SentinelRouter initialized with configurations for log types: {list(self.table_configs.keys())}")
        # Define a default table name for logs that don't match any specific configuration.
        self.default_table_name = "Generic_Logs_CL"


    def validate_schema(self, log_data: Dict[str, Any], log_type: str) -> bool:
        """
        Validates if the processed log data contains the required fields
        for the specified log type's target table schema.

        Args:
            log_data (Dict): The dictionary representing the processed log record.
            log_type (str): The internal type name used to look up the configuration.

        Returns:
            bool: True if the schema is valid, False otherwise. Logs errors if invalid.
        """
        config = self.table_configs.get(log_type)

        # If no specific config exists for this log_type, assume schema is valid
        # as it will likely go to a generic table with fewer strict requirements.
        if not config:
            self.log.debug(f"No specific table config for log type '{log_type}'. Skipping strict schema validation.")
            return True

        required_fields = config.get('required_fields', [])
        if not required_fields:
            self.log.debug(f"No required fields defined for log type '{log_type}'. Schema considered valid.")
            return True # No required fields to check

        # Find fields required by the config but missing from the actual log data.
        missing_fields = [
            field for field in required_fields
            if field not in log_data or log_data[field] is None # Also check for None values if needed
        ]

        if missing_fields:
            self.log.error(f"Schema validation FAILED for log type '{log_type}' targeting table '{config.get('table_name', 'N/A')}'. "
                           f"Missing required fields: {missing_fields}. Log data keys: {list(log_data.keys())}")
            return False
        else:
            self.log.debug(f"Schema validation PASSED for log type '{log_type}' targeting table '{config.get('table_name', 'N/A')}'.")
            return True

    def get_target_table(self, log_type: str, log_data: Optional[Dict[str, Any]] = None) -> str:
        """
        Determines the target Sentinel custom table name based on the log type.
        Optionally allows for more complex routing based on log_data content in the future.

        Args:
            log_type (str): The internal type name (e.g., 'firewall', 'vpn', 'unknown_app').
            log_data (Optional[Dict], optional): The processed log data, allowing for content-based routing if needed. Defaults to None.

        Returns:
            str: The name of the target Sentinel Custom Log table (e.g., "Firewall_CL").
        """
        base_config = self.table_configs.get(log_type)

        if not base_config or 'table_name' not in base_config:
            self.log.warning(f"No specific table configuration or table_name found for log type '{log_type}'. "
                           f"Routing to default table: {self.default_table_name}")
            return self.default_table_name

        target_table = base_config['table_name']

        # --- Example of more complex routing (optional) ---
        # You could add rules based on log content here. For example:
        # if log_data and log_data.get('Severity') == 'Critical':
        #    target_table = f"{target_table}_Critical" # Route critical logs to a separate table/partition
        # elif log_data and 'PII_Detected' in log_data:
        #     target_table = f"{target_table}_Sensitive"

        self.log.debug(f"Routing log type '{log_type}' to target table: {target_table}")
        return target_table

Summary: The SentinelRouter uses a configuration dictionary (table_configs) to manage metadata about target Sentinel tables. The validate_schema method checks if a processed log record contains the fields required by its target table's configuration. The get_target_table method determines the correct Sentinel table name (e.g., Firewall_CL, VPN_CL) based on the log type, falling back to a default table if no specific configuration exists. This ensures data lands in the right place and meets basic structural requirements.

4. Example: Custom Firewall Parser

Building on the base class, here's a simplified parser for a hypothetical pipe-delimited (|) firewall log.

class FirewallLogParser(BaseLogParser):
    """
    Parses a specific format of pipe-delimited firewall logs.
    Inherits from BaseLogParser for common normalization.
    """
    def __init__(self):
        super().__init__() # Initialize the base class
        self.log = logging.getLogger(__name__)
        # Define the expected field names *in the order they appear* in the raw log line.
        # This MUST match the structure of the logs being parsed.
        self.field_names = [
            'log_timestamp', 'hostname', 'action', 'protocol', 'src_ip',
            'src_port', 'dst_ip', 'dst_port', 'rule_name', 'bytes_sent', 'info'
        ]
        # Define mapping from raw field names (from self.field_names) to Sentinel field names.
        # Use target Sentinel field names (or an intermediate standard schema).
        self.field_mappings = {
            'log_timestamp': 'TimeGenerated', # Special field Sentinel uses
            'hostname': 'DeviceHostName',
            'action': 'FirewallAction',
            'protocol': 'Protocol',
            'src_ip': 'SourceIP',
            'src_port': 'SourcePort',
            'dst_ip': 'DestinationIP',
            'dst_port': 'DestinationPort',
            'rule_name': 'RuleName',
            'bytes_sent': 'BytesSent',
            # 'info' field might be ignored or mapped to a generic 'AdditionalInfo' field
        }
        # Initialize the format handler for specific type normalizations
        self.formatter = LogFormatHandler()
        self.log.info("FirewallLogParser initialized.")

    def parse(self, log_line: str) -> Optional[Dict[str, Any]]:
        """
        Parses a single pipe-delimited firewall log line.

        Args:
            log_line (str): The raw log line string.

        Returns:
            Optional[Dict[str, Any]]: A dictionary of parsed and normalized fields,
                                      ready for schema validation and routing, or None on failure.
        """
        if not isinstance(log_line, str) or not log_line.strip():
             return None

        fields = log_line.strip().split('|')

        # Basic validation: Check if the number of fields matches expectations.
        if len(fields) != len(self.field_names):
            self.log.error(f"Firewall log line has incorrect number of fields. Expected {len(self.field_names)}, got {len(fields)}. Line: '{log_line[:100]}...'")
            return None # Parsing failed

        # Create a dictionary from the raw field names and split values.
        raw_data = dict(zip(self.field_names, fields))
        processed_data = {}

        # Iterate through the raw data, apply mappings, and normalize values.
        for raw_name, raw_value in raw_data.items():
            # Skip fields we don't intend to map.
            if raw_name not in self.field_mappings:
                continue

            target_name = self.field_mappings[raw_name]

            # Apply basic normalization (remove control chars, trim whitespace, empty to None)
            normalized_value = self.normalize_field(raw_name, raw_value)

            # Skip if normalization resulted in None (empty field) unless the field is TimeGenerated
            if normalized_value is None and target_name != 'TimeGenerated':
                 continue

            # Apply specific type normalization using LogFormatHandler
            if target_name == 'TimeGenerated':
                # Attempt to parse the timestamp string. Must succeed for Sentinel.
                timestamp_dt = self.formatter.parse_timestamp(normalized_value)
                if timestamp_dt:
                     # Sentinel expects TimeGenerated in ISO 8601 format string.
                     processed_data[target_name] = timestamp_dt.isoformat()
                else:
                     # If TimeGenerated cannot be parsed, the record is likely unusable.
                     self.log.error(f"Failed to parse mandatory TimeGenerated field from value '{normalized_value}' in line: '{log_line[:100]}...'")
                     return None # Reject record if essential timestamp is bad
            elif target_name in ['SourceIP', 'DestinationIP']:
                 processed_data[target_name] = self.formatter.clean_ip_address(normalized_value)
            elif target_name == 'FirewallAction':
                 processed_data[target_name] = self.formatter.normalize_event_type(normalized_value)
            elif target_name in ['SourcePort', 'DestinationPort', 'BytesSent']:
                 # Attempt to convert numeric fields to integers
                 try:
                      processed_data[target_name] = int(normalized_value)
                 except (ValueError, TypeError):
                      self.log.warning(f"Could not convert field '{target_name}' value '{normalized_value}' to integer in line: '{log_line[:100]}...'")
                      processed_data[target_name] = None # Or keep as string, depending on schema
            else:
                # For other mapped fields, use the basic normalized value.
                processed_data[target_name] = normalized_value


        # Filter out any None values after processing (optional, depends on schema)
        # processed_data = {k: v for k, v in processed_data.items() if v is not None}

        return processed_data

Summary: This FirewallLogParser demonstrates how to create a specific parser inheriting from BaseLogParser. It defines the expected field order (field_names) for a pipe-delimited format and a field_mappings dictionary to translate raw names to target Sentinel schema names. The parse method splits the line, applies basic normalization via the inherited method, uses LogFormatHandler for specific types (timestamps, IPs), performs type conversions (e.g., to integer), and returns a structured dictionary ready for validation and routing. It includes basic validation for field count and timestamp parsing.

5. Monitoring the Parsing Process

Just like other pipeline stages, monitoring parsing health is vital. Errors here mean data quality issues or data loss.

(Note: The following code assumes an asynchronous environment (async/await) and a hypothetical MonitorClient designed to send custom metrics, likely to Azure Monitor. In a real implementation, you would use the appropriate Azure Monitor SDK, such as azure-monitor-ingestion for custom metrics, adapting the calls and potentially removing async/await if using a synchronous framework.)*

import logging
import time
# Assuming MonitorClient is a placeholder for Azure Monitor interaction
# from azure.monitor.ingestion import LogsIngestionClient # Potential library
# from azure.identity import DefaultAzureCredential # For auth

class LogIngestionMonitor:
    """
    Placeholder for monitoring log parsing and ingestion metrics.
    Likely sends data to Azure Monitor Custom Metrics or Logs.
    """
    def __init__(self, environment: str = 'production'):
        """
        Initializes the monitor.

        Args:
            environment (str): Deployment environment tag (e.g., 'production', 'staging').
        """
        self.log = logging.getLogger(__name__)
        self.environment = environment
        # --- Placeholder for actual Azure Monitor Client Initialization ---
        # try:
        #     # Example using azure-monitor-ingestion (adapt endpoint and credential)
        #     credential = DefaultAzureCredential()
        #     self.metrics_client = LogsIngestionClient(endpoint="YOUR_DCE_ENDPOINT", credential=credential)
        #     self.log.info("Initialized Azure Monitor LogsIngestionClient.")
        # except Exception as e:
        #     self.log.error(f"Failed to initialize Azure Monitor client: {e}", exc_info=True)
        #     self.metrics_client = None # Ensure client is None if init fails
        self.metrics_client = None # Keeping it None for this example structure
        self.log.info(f"LogIngestionMonitor initialized for environment: {environment}.")

        # Define alert thresholds (example values)
        self.alert_thresholds = {
            'parsing_error_rate': 0.05,  # Alert if > 5% of records fail parsing in a batch/period
            'avg_processing_time_ms': 50, # Alert if avg time per record exceeds 50ms
            'schema_validation_failure_rate': 0.02, # Alert if > 2% fail schema validation
        }


    # Note: Using 'async' here assumes the surrounding application uses an async framework (like asyncio)
    async def track_parsing_metrics(self, log_type: str, batch_metrics: Dict[str, Any]):
        """
        Tracks metrics related to a batch of parsed logs.

        Args:
            log_type (str): The type of log being processed (e.g., 'firewall').
            batch_metrics (Dict): A dictionary containing metrics for the batch, e.g.,
                                 {'processed': 1000, 'errors': 15, 'schema_failures': 5, 'duration_ms': 1234.5}
        """
        if not self.metrics_client:
             self.log.warning("Metrics client not initialized. Skipping metric tracking.")
             return

        # --- Placeholder for actual metric sending logic ---
        # This would involve formatting the metrics according to the Azure Monitor API requirements
        # and using self.metrics_client.upload(...) or similar.
        # Example structure - adapt to actual API:
        custom_metrics_payload = [
             {
                 "Time": datetime.datetime.now(datetime.timezone.utc).isoformat(), # Timestamp for the metric
                 "LogType": log_type,
                 "RecordsProcessed": batch_metrics.get('processed', 0),
                 "ParsingErrors": batch_metrics.get('errors', 0),
                 "SchemaValidationFailures": batch_metrics.get('schema_failures', 0),
                 "ProcessingTimeMs": batch_metrics.get('duration_ms', 0),
                 "Environment": self.environment,
                 # Add other relevant dimensions/metrics
             }
        ]
        try:
            # self.log.debug(f"Sending metrics to Azure Monitor: {custom_metrics_payload}")
            # await self.metrics_client.upload(rule_id="YOUR_DCR_IMMUTABLE_ID", stream_name="YOUR_STREAM_NAME", logs=custom_metrics_payload)
            self.log.info(f"Successfully tracked metrics for log type '{log_type}' (Placeholder - No actual send).")
            # --- End Placeholder ---
        except Exception as e:
             self.log.error(f"Failed to post parsing metrics for log type '{log_type}': {e}", exc_info=True)


    def check_alert_conditions(self, metrics: Dict[str, Any], log_type: str) -> List[Dict[str, Any]]:
        """
        Evaluates batch metrics against predefined thresholds and returns alerts.

        Args:
            metrics (Dict): Metrics collected for a batch (e.g., {'processed': 1000, 'errors': 60, 'schema_failures': 10, 'duration_ms': 60000}).
            log_type (str): The log type these metrics relate to.

        Returns:
            List[Dict[str, Any]]: A list of alert dictionaries if thresholds are breached.
        """
        alerts = []
        processed = metrics.get('processed', 0)

        if processed == 0: # Avoid division by zero
             return alerts

        # Calculate error rates
        error_rate = metrics.get('errors', 0) / processed
        schema_failure_rate = metrics.get('schema_failures', 0) / processed
        avg_time = metrics.get('duration_ms', 0) / processed if processed > 0 else 0

        # Check thresholds
        if error_rate > self.alert_thresholds['parsing_error_rate']:
            alerts.append({
                'type': 'HighParsingErrorRate', 'log_type': log_type,
                'message': f"Parsing error rate {error_rate:.2%} exceeds threshold ({self.alert_thresholds['parsing_error_rate']:.0%})",
                'severity': 'High', 'value': error_rate
            })

        if schema_failure_rate > self.alert_thresholds['schema_validation_failure_rate']:
             alerts.append({
                 'type': 'HighSchemaFailureRate', 'log_type': log_type,
                 'message': f"Schema validation failure rate {schema_failure_rate:.2%} exceeds threshold ({self.alert_thresholds['schema_validation_failure_rate']:.0%})",
                 'severity': 'Medium', 'value': schema_failure_rate
             })

        if avg_time > self.alert_thresholds['avg_processing_time_ms']:
             alerts.append({
                 'type': 'HighProcessingLatency', 'log_type': log_type,
                 'message': f"Average processing time {avg_time:.2f}ms exceeds threshold ({self.alert_thresholds['avg_processing_time_ms']}ms)",
                 'severity': 'Medium', 'value': avg_time
             })

        if alerts:
             self.log.warning(f"Alert conditions met for log type '{log_type}': {alerts}")

        return alerts

Summary: The LogIngestionMonitor class (presented conceptually) is responsible for tracking the health of the parsing and routing process. It would ideally integrate with a monitoring service like Azure Monitor to send custom metrics (parsing errors, schema failures, processing time). The check_alert_conditions method provides logic to evaluate these metrics against thresholds, enabling automated alerting on issues like high error rates or latency, ensuring operational visibility into this critical stage.

Conclusion: The Path to Meaningful Data

Handling diverse log formats and routing them effectively is essential for transforming raw logs into actionable security intelligence in Microsoft Sentinel. By implementing a flexible parsing framework, specific data normalization routines, schema validation, and intelligent routing, you significantly enhance data quality, improve query performance, and empower your security analysts.

This detailed processing ensures that the data arriving in Sentinel is clean, consistent, and ready for advanced analytics and threat hunting.

What challenges have you faced when parsing and normalizing diverse security logs? Share your strategies and questions below!

Stay tuned for the next episode in this series, where we'll discuss deployment strategies, operational best practices, and maintaining the connector in a production environment.

0
Subscribe to my newsletter

Read articles from Topaz Hurvitz directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Topaz Hurvitz
Topaz Hurvitz