Building a Scalable Data Pipeline with Logstash and OpenSearch on Kubernetes


Organizations need robust pipelines in today's data-driven world to collect, process, and analyze their data efficiently. This blog post walks through implementing a scalable data pipeline using Logstash and OpenSearch on Kubernetes, which is perfect for organizations looking to enhance their data capabilities with open-source solutions.
Architecture Overview
Our architecture leverages three key components:
Data Source: A MariaDB database
Data Processing: Custom Logstash pipelines for ETL (Extract, Transform, Load) operations
Data Storage & Analysis: OpenSearch cluster with Dashboards for visualization
Here's how data flows through the system:
MariaDB Database → Logstash Pipelines → OpenSearch Cluster → OpenSearch Dashboards
Part 1: Setting Up the Logstash Pipeline
Custom Logstash Image
We start by creating a custom Logstash Docker image that includes the OpenSearch output plugin and MariaDB JDBC driver:
FROM docker.elastic.co/logstash/logstash:9.0.0
RUN logstash-plugin install logstash-output-opensearch
COPY --chown=logstash:logstash bin/mariadb-3.5.3.jar /usr/share/logstash/logstash-core/lib/jars/mariadb-3.5.3.jar
CMD ["logstash"]
This image is pushed to Docker Hub for use in our Kubernetes deployment.
Kubernetes Configuration
We use Kustomize to manage our Kubernetes configuration with a base/overlay structure:
.
├── Dockerfile
├── bin
│ └── mariadb-3.5.3.jar
├── k8s
│ ├── base
│ │ ├── configmap.yaml
│ │ ├── kustomization.yaml
│ │ ├── service.yaml
│ │ └── statefulset.yaml
│ └── overlay
│ └── test
│ ├── kustomization.yaml
│ ├── sealedsecrets.yaml
│ └── version.yaml
Our StatefulSet configuration includes:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: logstash
labels:
app: logstash
spec:
serviceName: logstash
replicas: 1
selector:
matchLabels:
app: logstash
template:
metadata:
labels:
app: logstash
spec:
securityContext:
runAsUser: 1000
runAsGroup: 1000
fsGroup: 1000
containers:
- name: logstash
image: mycustomlogstash/logstash-custom:latest
imagePullPolicy: Always
ports:
- containerPort: 5000
env:
- name: CONFIG_SUPPORT_ESCAPE
value: "true"
- name: ELASTICSEARCH_URL
value: "https://opensearch-endpoint:9200"
volumeMounts:
- name: logstash-data
mountPath: /usr/share/logstash/data
- name: opensearch-ca
mountPath: /usr/share/logstash/certs/
readOnly: true
- name: logstash-config
mountPath: /usr/share/logstash/config/mypipeline.yml
readOnly: true
subPath: mypipeline.yml
- name: logstash-pipelines
mountPath: /usr/share/logstash/pipelines
readOnly: true
volumeClaimTemplates:
- metadata:
name: logstash-data
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 5Gi
We expose the service with:
apiVersion: v1
kind: Service
metadata:
name: logstash
spec:
selector:
app: logstash
ports:
- protocol: TCP
port: 5000
targetPort: 5000
type: ClusterIP
Pipeline Configuration
Our ConfigMap configuration includes:
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-config
data:
pipelines.yml: |-
- pipeline.id: mypipeline
path.config: "/usr/share/logstash/pipelines/mypipeline.conf"
For this demo, we've implemented a demo pipeline:
Here's an example of our pipeline:
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mariadb-3.5.3.jar"
jdbc_driver_class => "Java::org.mariadb.jdbc.Driver"
jdbc_validate_connection => true
jdbc_connection_string => "jdbc:yourdb://your-host:port/your_database"
jdbc_user => "your_username"
jdbc_password => "your_password"
schedule => "*/1 * * * *" # Runs every minute
statement => "
SELECT
a.id,
a.name,
a.description,
b.info AS related_info
FROM table_a a
LEFT JOIN table_b b ON a.b_id = b.id
"
record_last_run => true
sql_log_level => "debug"
last_run_metadata_path => "/usr/share/logstash/jdbc_last_run_metadata_path/mariadb"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]" }
remove_field => ["@version", "@timestamp"]
}
json {
source => "related_info"
target => "related_info"
}
}
output {
opensearch {
hosts => ["https://your-opensearch-host:9200"]
index => "your_index_name"
user => "your_opensearch_user"
password => "your_opensearch_password"
document_id => "%{[@metadata][_id]}"
ssl => true
cacert => "/usr/share/logstash/certs/ca.crt"
}
}
Advanced JDBC Configuration
The pipeline includes advanced JDBC configuration:
sql_log_level => "debug"
record_last_run => true
last_run_metadata_path => "/usr/share/logstash/jdbc_last_run_metadata_path/mariadb"
These settings are crucial for:
Detailed SQL logging for troubleshooting
Tracking the last successful run to prevent duplicate processing
Maintaining state information between container restarts
Part 2: Deploying OpenSearch
We use the OpenSearch Operator to deploy and manage our OpenSearch cluster on Kubernetes.
Installing the OpenSearch Operator
First, we install the OpenSearch Operator using Helm:
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
helmCharts:
- name: opensearch-operator
releaseName: opensearch-operator
repo: https://opensearch-project.github.io/opensearch-k8s-operator/
version: 2.7.0
valuesFile: ./values.yaml
namespace: webapps-test
Our operator values.yaml contains:
manager:
loglevel: info
watchNamespace: test #this limts this operator to this namespace
useRoleBindings: true #sicne we are limiting to test namespace, its best to use namespace scoped RB
Deploying the OpenSearch Cluster
We create an OpenSearch cluster using the OpenSearchCluster custom resource:
apiVersion: opensearch.opster.io/v1
kind: OpenSearchCluster
metadata:
name: opensearch
spec:
general:
version: 2.14.0
httpPort: 9200
vendor: opensearch
serviceName: opensearch
security:
config:
adminCredentialsSecret:
name: admin-credentials-secret
securityConfigSecret:
name: securityconfig-secret
tls:
transport:
generate: true
http:
generate: true
dashboards:
tls:
enable: true
generate: true
opensearchCredentialsSecret:
name: admin-credentials-secret
version: 2.14.0
enable: true
replicas: 1
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "500m"
confMgmt:
smartScaler: true
nodePools:
- component: masters
replicas: 3
diskSize: "30Gi"
resources:
requests:
memory: "2Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "500m"
roles:
- "cluster_manager"
- "data"
Note: The openseach operator creates Kubernetes resources, including certificates, services, statefulsets, and pods for opensearch and opensearch dashboard. Note the service created and use that in your Logstash deployment. The naming follows the values set for serviceName
.
Security Configuration
We create Kubernetes secrets for OpenSearch credentials and security configuration:
apiVersion: v1
kind: Secret
metadata:
name: admin-credentials-secret
type: Opaque
stringData:
username: admin
password: demo_admin_password
Our security configuration includes predefined users, roles, and role mappings:
internal_users.yml: |-
_meta:
type: "internalusers"
config_version: 2
admin:
hash: "$2a$12$TwqwerwediakNUJa5C.xxxxxxxxxxxxxxxxxxxxxxxxx"
reserved: true
backend_roles:
- "admin"
description: "Admin user"
dashboard_user:
hash: "$2a$retrLlNfdiakNUJa5C.xxxxxxxxxxxxxxxxxxxxxxxxx"
reserved: true
description: "OpenSearch Dashboards user"
Use the following command to generate the user password hash;
python -c 'import bcrypt; print(bcrypt.hashpw("demo_admin_password".encode("utf-8"), bcrypt.gensalt(12, prefix=b"2a")).decode("utf-8"))'
Note: This is a demonstration setup.
Subscribe to my newsletter
Read articles from Ucheagwu Onyike directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
