Building a Scalable Data Pipeline with Logstash and OpenSearch on Kubernetes

Ucheagwu OnyikeUcheagwu Onyike
4 min read

Organizations need robust pipelines in today's data-driven world to collect, process, and analyze their data efficiently. This blog post walks through implementing a scalable data pipeline using Logstash and OpenSearch on Kubernetes, which is perfect for organizations looking to enhance their data capabilities with open-source solutions.

Architecture Overview

Our architecture leverages three key components:

  1. Data Source: A MariaDB database

  2. Data Processing: Custom Logstash pipelines for ETL (Extract, Transform, Load) operations

  3. Data Storage & Analysis: OpenSearch cluster with Dashboards for visualization

Here's how data flows through the system:

MariaDB Database → Logstash Pipelines → OpenSearch Cluster → OpenSearch Dashboards

Part 1: Setting Up the Logstash Pipeline

Custom Logstash Image

We start by creating a custom Logstash Docker image that includes the OpenSearch output plugin and MariaDB JDBC driver:

FROM docker.elastic.co/logstash/logstash:9.0.0
RUN logstash-plugin install logstash-output-opensearch
COPY --chown=logstash:logstash bin/mariadb-3.5.3.jar /usr/share/logstash/logstash-core/lib/jars/mariadb-3.5.3.jar
CMD ["logstash"]

This image is pushed to Docker Hub for use in our Kubernetes deployment.

Kubernetes Configuration

We use Kustomize to manage our Kubernetes configuration with a base/overlay structure:

.
├── Dockerfile
├── bin
│   └── mariadb-3.5.3.jar
├── k8s
│   ├── base
│   │   ├── configmap.yaml
│   │   ├── kustomization.yaml
│   │   ├── service.yaml
│   │   └── statefulset.yaml
│   └── overlay
│       └── test
│           ├── kustomization.yaml
│           ├── sealedsecrets.yaml
│           └── version.yaml

Our StatefulSet configuration includes:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: logstash
  labels:
    app: logstash
spec:
  serviceName: logstash
  replicas: 1
  selector:
    matchLabels:
      app: logstash
  template:
    metadata:
      labels:
        app: logstash
    spec:
      securityContext:
        runAsUser: 1000
        runAsGroup: 1000
        fsGroup: 1000
      containers:
      - name: logstash
        image: mycustomlogstash/logstash-custom:latest
        imagePullPolicy: Always
        ports:
        - containerPort: 5000
        env:
        - name: CONFIG_SUPPORT_ESCAPE
          value: "true"
        - name: ELASTICSEARCH_URL
          value: "https://opensearch-endpoint:9200"
        volumeMounts:
        - name: logstash-data
          mountPath: /usr/share/logstash/data
        - name: opensearch-ca
          mountPath: /usr/share/logstash/certs/
          readOnly: true
        - name: logstash-config
          mountPath: /usr/share/logstash/config/mypipeline.yml
          readOnly: true
          subPath: mypipeline.yml
        - name: logstash-pipelines
          mountPath: /usr/share/logstash/pipelines
          readOnly: true
  volumeClaimTemplates:
  - metadata:
      name: logstash-data
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 5Gi

We expose the service with:

apiVersion: v1
kind: Service
metadata:
  name: logstash
spec:
  selector:
    app: logstash
  ports:
    - protocol: TCP
      port: 5000
      targetPort: 5000
  type: ClusterIP

Pipeline Configuration

Our ConfigMap configuration includes:

apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-config
data:
  pipelines.yml: |-
    - pipeline.id: mypipeline
      path.config: "/usr/share/logstash/pipelines/mypipeline.conf"

For this demo, we've implemented a demo pipeline:

Here's an example of our pipeline:

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mariadb-3.5.3.jar"
    jdbc_driver_class => "Java::org.mariadb.jdbc.Driver"
    jdbc_validate_connection => true
    jdbc_connection_string => "jdbc:yourdb://your-host:port/your_database"
    jdbc_user => "your_username"
    jdbc_password => "your_password"
    schedule => "*/1 * * * *"  # Runs every minute
    statement => "
      SELECT 
        a.id,
        a.name,
        a.description,
        b.info AS related_info
      FROM table_a a
      LEFT JOIN table_b b ON a.b_id = b.id
    "
    record_last_run => true
    sql_log_level => "debug"
    last_run_metadata_path => "/usr/share/logstash/jdbc_last_run_metadata_path/mariadb"
  }
}

filter {
  mutate {
    copy => { "id" => "[@metadata][_id]" }
    remove_field => ["@version", "@timestamp"]
  }
  json {
    source => "related_info"
    target => "related_info"
  }
}

output {
  opensearch {
    hosts => ["https://your-opensearch-host:9200"]
    index => "your_index_name"
    user => "your_opensearch_user"
    password => "your_opensearch_password"
    document_id => "%{[@metadata][_id]}"
    ssl => true
    cacert => "/usr/share/logstash/certs/ca.crt"
  }
}

Advanced JDBC Configuration

The pipeline includes advanced JDBC configuration:

sql_log_level => "debug"
record_last_run => true
last_run_metadata_path => "/usr/share/logstash/jdbc_last_run_metadata_path/mariadb"

These settings are crucial for:

  • Detailed SQL logging for troubleshooting

  • Tracking the last successful run to prevent duplicate processing

  • Maintaining state information between container restarts

Part 2: Deploying OpenSearch

We use the OpenSearch Operator to deploy and manage our OpenSearch cluster on Kubernetes.

Installing the OpenSearch Operator

First, we install the OpenSearch Operator using Helm:

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

helmCharts:
  - name: opensearch-operator
    releaseName: opensearch-operator
    repo: https://opensearch-project.github.io/opensearch-k8s-operator/
    version: 2.7.0
    valuesFile: ./values.yaml
    namespace: webapps-test

Our operator values.yaml contains:

manager:
  loglevel: info
  watchNamespace: test #this limts this operator to this namespace
useRoleBindings: true #sicne we are limiting to test namespace, its best to use namespace scoped RB

Deploying the OpenSearch Cluster

We create an OpenSearch cluster using the OpenSearchCluster custom resource:

apiVersion: opensearch.opster.io/v1
kind: OpenSearchCluster
metadata:
  name: opensearch
spec:
  general:
    version: 2.14.0
    httpPort: 9200
    vendor: opensearch
    serviceName: opensearch
  security:
    config:
      adminCredentialsSecret:
        name: admin-credentials-secret
      securityConfigSecret:
       name: securityconfig-secret
    tls:
      transport:
        generate: true
      http:
        generate: true
  dashboards:
    tls:
      enable: true
      generate: true
    opensearchCredentialsSecret:
      name: admin-credentials-secret
    version: 2.14.0
    enable: true
    replicas: 1
    resources:
      requests:
         memory: "1Gi"
         cpu: "500m"
      limits:
         memory: "1Gi"
         cpu: "500m"
  confMgmt:
    smartScaler: true
  nodePools:
    - component: masters
      replicas: 3
      diskSize: "30Gi"
      resources:
         requests:
            memory: "2Gi"
            cpu: "500m"
         limits:
            memory: "2Gi"
            cpu: "500m"
      roles:
        - "cluster_manager"
        - "data"

Note: The openseach operator creates Kubernetes resources, including certificates, services, statefulsets, and pods for opensearch and opensearch dashboard. Note the service created and use that in your Logstash deployment. The naming follows the values set for serviceName.

Security Configuration

We create Kubernetes secrets for OpenSearch credentials and security configuration:

apiVersion: v1
kind: Secret
metadata:
  name: admin-credentials-secret
type: Opaque
stringData:
  username: admin
  password: demo_admin_password

Our security configuration includes predefined users, roles, and role mappings:

internal_users.yml: |-
  _meta:
    type: "internalusers"
    config_version: 2
  admin:
    hash: "$2a$12$TwqwerwediakNUJa5C.xxxxxxxxxxxxxxxxxxxxxxxxx"
    reserved: true
    backend_roles:
    - "admin"
    description: "Admin user"
  dashboard_user:
    hash: "$2a$retrLlNfdiakNUJa5C.xxxxxxxxxxxxxxxxxxxxxxxxx"
    reserved: true
    description: "OpenSearch Dashboards user"

Use the following command to generate the user password hash;

python -c 'import bcrypt; print(bcrypt.hashpw("demo_admin_password".encode("utf-8"), bcrypt.gensalt(12, prefix=b"2a")).decode("utf-8"))'

Note: This is a demonstration setup.

0
Subscribe to my newsletter

Read articles from Ucheagwu Onyike directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Ucheagwu Onyike
Ucheagwu Onyike