Security Detections as Code: Version-Controlled, Tested, Approved


Inspiration
I first encountered the term 'Detections as Code' while exploring security blogs during my daily routine of staying updated with security news. I came across various blog articles such as, “Rolling your own Detections as Code with Elastic Security” by Mike Ayenson Kseniia Ignatovych and Justin Ibarra the wonderful folks at Elastic, and “From soup to nuts: Building a Detection-as-Code pipeline” by David French, a Staff Security Engineer at Google. These writings highlighted the key focus of Detections as Code and how it complements the practice of detection engineering. I have been involved in crafting security detections in SIEMs (Splunk/Elastic) based on business needs, and I resonated with these writings due to the common lack of a 'shift-left' approach in this area. There is often an expectation that a security detection is needed for a specific scenario (e.g., "a detection for X"), prompting the creation of the detection using a query language such as SPL or ES|QL, followed by independent testing and implementation in the production SIEM. Unlike traditional code deployments, which typically undergo peer review and approval before being pushed to production, detection logic may at times be implemented with less formal scrutiny.
What is Detections as Code?
Detections as Code involves treating your security detections with the same rigor and best practices as software development. This approach includes several key steps:
First, you develop your detection logic
Next, you rigorously test it within a CI/CD pipeline to ensure reliability and effectiveness
Finally, your detections undergo a peer review process for approval before being deployed in a production environment
The above image isn't foolproof by any means. Nevertheless, the benefits of Detections as Code are significant. This approach allows you to establish a structured method for managing security detections within your organization. By enforcing the use of a Version Control System, you gain auditing capabilities. Additionally, adhering to a coding standard simplifies the process of writing future detections, making it a more straightforward and repeatable task.
Eager to grasp this concept, I embarked on a brief journey to create a Proof of Concept. Here's how I structured my PoC:
We will be using Terraform to provision our resources.
Terraform is an provisioning tool that enables the use of various providers in a declarative manner. The Terraform files I will utilize are designed to:
Spin up a local Splunk Container, allowing detection engineers to conduct testing in their local environment. They will handle the ingestion of logs into this container.
Create a Splunk Saved Search, which will simulate a correlation search (security detection).
Splunk will function as our SIEM solution, albeit without the enhanced features provided by Splunk Enterprise Security.
Finally, we will use Gitlab as our VCS.
Let’s create our Terraform Files!
Splunk Container
The first set of Terraform files will support a reusable module to spin up a Splunk container locally. We’ll use the kreuzwerker/docker community Terraform provider along with the official Splunk Docker image.
The module will make use of the docker_container
resource type, along with several key arguments:
name - The name assigned to the container.
image - The Docker image the container will be based on.
- This references the
id
attribute provided by adocker_image
data block.
- This references the
ports - A list of port mappings to bind Splunk’s Web and Management ports from the container to the host.
env - An array of environment variables set on the container that the Splunk Docker Image will use for proper functioning of the container (e.g. accepting the EULA, enabling HTTPS).
Executing the
terraform plan -target=module.splunk
command showed no issues along with the resources our module (splunk) will create. Therefore, we will proceed withterraform apply -target=module.splunk
to create our Splunk Container!
docker logs splunkcontainer
informs us that the splunk-ansible playbook was able to execute the necessary tasks required to successfully launch the container. We can confirm this by accessing the container on our localhost on port 8000.
Great we have our Splunk container which we can access over the web on HTTPS! Now let’s move on to the next module which will be responsible for creating our Saved Search which will emulate a Correlation Search in Splunk (Security Detection).
Splunk Saved Search / Security Detection
In order to create a saved search in our container, we must utilize the Splunk provider which is a partner provider in Terraform. The detection engineer will checkout into their development branch to craft a module for doing so. The resource_type splunk_saved_searches
will consist of the following key arguments…
name - The name of the saved search.
actions - When you create and schedule a saved search in Splunk the results of your query can be utilized to do the following…
Generate a report
Generate an alert
Save it to a dashboard panel
Save it as an event type
- For the purposes of this demo we will be generating a report.
search - This will be the SPL that will power the saved search.
cron_schedule - The interval the saved search will execute at.
dispatch_earliest_time - How far back in time we will look into our logs for events.
dispatch_latest_time - The latest time we should look into our logs for events.
is_schedule - Accepts a boolean to determine if it’s schedule or not.
It’s a pretty simple Terraform configuration file making use of only 1 resource_type. With the exception of adding the required terraform block, all in all it looks like this…
Crafting our Detection
The detection engineer will execute Terraform commands from the root module, selectively targeting specific child modules using the -target parameter to manage deployment scope effectively.
In this scenario, I chose to develop a detection rule designed to report on repeated failed login attempts to our Splunk instance. By default, Splunk maintains an internal index named _audit, which records audit-related activity such as authentication failures and modifications to knowledge objects like lookups. These events are typically logged in the file located at $SPLUNK_HOME/var/log/splunk/audit.log.
Accordingly, I authored a detection rule aimed at identifying suspicious login behavior and placed it under the /detections/ directory naming the file suspicious_login.spl
index=_audit action="login attempt" info="failed"
| bucket _time span=1m
| stats count values(user) as users by _time, action
| where count > 10
Events are bucketed with a 1 minute time-span and if within that 1 minute time-frame, there are more than 10 failed login events, we will generate a report.
To properly validate the detection rule, I needed a way to simulate authentication failures. I decided to write a simple for loop that sends a series of invalid basic authentication credentials to the /services/auth/login endpoint. This RESTful endpoint is used by clients to authenticate and retrieve a session key, which can then be used to access other protected Splunk endpoints. By repeatedly submitting incorrect credentials, I was able to generate the necessary audit events to test the rule's effectiveness.
import os
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
SPLUNK_HOST = os.getenv("SPLUNK_HOST", "localhost")
for i in range(20):
headers = {
"username": "admin",
"password": i
}
requests.post(f"https://{SPLUNK_HOST}:8089/services/auth/login", data=headers, verify=False)
Going back to the searching & reporting app in Splunk and examining the _audit
index for action=”login attempt” info=”failed”
we can observe that the audit index ingested events related to failed authentication attempts.
Therefore, we can now run the module responsible for creating the security detection in Splunk to report for failed logins
We can observe that our security detection was successfully created in Splunk in the UI
In addition, we can retrieve the Jobs that the Splunk Search Schedule performed to generate the report. Here we can observe the 20 events the report was able to detect based on the query it’s powered by along with the results of that query.
The detection engineer is now confident in contributing a detection titled "suspicious_login" to the detections as code repository, ready for testing within our CI/CD pipeline. Let's explore how this setup will be configured and the though process behind it.
Setting up the CI/CD
For demonstration purposes, this CI/CD process will be tailored specifically towards the scenario of validating the numerous failed logins. The Gitlab runner responsible for creating the jobs in the pipeline will only trigger if there is a Merge Request to the main
branch.
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "main"'
when: always
When this event occurs, the CI/CD job will run a validation script at the path ci/test.py
. The script is responsible for…
Connecting to the Splunk container created in the CI/CD jobs.
Ingesting failed authentication logs into the Splunk container via the test Python script.
Executing the detection (saved search) inside of the Splunk container
Validating the detection results with the help of test YAML parameters (time range)
The contents of ci/test.py
can be observed below
import os
import sys
import json
import time
import yaml
import urllib3
import requests
import subprocess
import xml.etree.ElementTree as ET
SPLUNK_PROTOCOL = "https"
SPLUNK_MGMT_PORT = "8089"
SPLUNK_HOST= os.getenv("SPLUNK_HOST", "splunk")
SPLUNK_URL=f"{SPLUNK_PROTOCOL}://{SPLUNK_HOST}:{SPLUNK_MGMT_PORT}"
SPLUNK_USER=os.getenv("SPLUNK_USER")
SPLUNK_PASSWORD=os.getenv("SPLUNK_PASSWORD")
def get_auth(data: dict) -> str:
try:
res = requests.post(SPLUNK_URL + "/services/auth/login", data=data, verify=False)
if res.status_code==200:
sessionKey = ET.fromstring(res.text).findtext("sessionKey")
except Exception as e:
print(e)
return sessionKey
def execute_search(security_detection: str, yaml_test_file: str, logs_script: str, get_session_key: str) -> requests.models.Response:
try:
subprocess.run(["python", logs_script], text=True)
except Exception as e:
subprocess.run(["python3", logs_script], text=True)
time.sleep(3) # Wait 3 seconds before opening the detection files and verifying results
with open(security_detection, 'r') as d:
spl_query = d.read()
with open(yaml_test_file, 'r') as t:
suspicious_login_test_yaml = t.read()
suspicious_login_test_yaml = yaml.safe_load(suspicious_login_test_yaml)
headers = {
"Authorization": "Splunk " + get_session_key
}
data = {
"search": spl_query,
"output_mode": "json",
"earliest_time": suspicious_login_test_yaml.get('earliest_time'),
"latest_time": suspicious_login_test_yaml.get('latest_time')
}
try:
search_results = requests.post(SPLUNK_URL + "/services/search/v2/jobs/export", headers=headers, data=data, verify=False)
except Exception as e:
print(e)
return search_results
def main(security_detection: str, yaml_test_file: str, logs_script: str, /):
verify_results = execute_search(security_detection, yaml_test_file, logs_script, get_session_key=get_auth(data = {"username": SPLUNK_USER,"password": SPLUNK_PASSWORD}))
for i in verify_results.iter_lines():
try:
i = json.loads(i)
if int(i['result']['count']) >= 20: # The detection looks for 20 failed logins minumum
print("Detection logic passed")
print(f"Event JSON which resulted in detection logic passing: {i}")
sys.exit(0)
else:
print("Detection logic failed")
sys.exit(1)
except KeyError as e:
print(f"Missking key: {e} in {i}")
if __name__ == '__main__':
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
print("sys.argv: ", sys.argv)
main(sys.argv[1], sys.argv[2], sys.argv[3])
The detection engineer requests a merge of their development branch into the main
branch.
The files that will be involved with this MR are shown below
The CI/CD job is too verbose, so it’s unnecessary to include the entire output. However, the job results are available for viewing here. It consists of a single stage titled, test_detection, and its success can be confirmed by examining the response results from executing the Splunk saved search (security detection).
The returned event JSON confirms the SPL for the Numerous Failed Logins security detection worked as expected. The test script (tests/synthetic/suspicious_login.py) submitted 20 failed login attempts to the Splunk container in the CI/CD environment. The SPL detection logic successfully matched and returned the expected result.
We are now ok to merge this kulmiye_dev
branch into main
Conclusion
Detections as Code is a practice which deserves much broader adoption amongst security teams. By treating security detection logic as code, teams can leverage collaborative workflows like version control and automated testing. Whilst this scenario was heavily focused on a specific use case (numerous failed logins), it can be generalized to ensure all types of security detections are validated and peer-reviewed.
Gitlab Files
https://gitlab.com/kxnice1/dac
References
https://www.elastic.co/blog/detections-as-code-elastic-security
https://www.elastic.co/security-labs/dac-beta-release
https://medium.com/threatpunter/from-soup-to-nuts-building-a-detection-as-code-pipeline-28945015fc38
Subscribe to my newsletter
Read articles from Kulmiye Egeh directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
