Promtail + OpenObserve - Bridging the Gap

WhatDoesKmean?WhatDoesKmean?
6 min read

Overview

When using Promtail to collect Docker logs and send them to OpenObserve, you may encounter compatibility issues because Promtail sends data in Loki's native format (compressed protobuf), while OpenObserve might expect a different JSON structure.

Use Case

This article was developed to solve a specific need, ingesting and monitoring logs from my own Docker containers running on a local development environment. Instead of relying on external logging services, I wanted to collect logs from all my containers and centralize them in OpenObserve.

This guide shows you how to create a proxy service that bridges this gap.

Intro

A quick check of Loki's documentation shows that Promtail sends data to the /loki/api/v1/push endpoint, not directly to OpenObserve.

Looking at the Loki push API, it sends data in this format:

{
  "streams": [
    {
      "stream": {
        "label": "value"
      },
      "values": [
        [ "<unix epoch in nanoseconds>", "<log line>" ]
      ]
    }
  ]
}

OpenObserve's /api/{org}/{stream}/_json endpoint expects a different JSON format, and it currently does not have a Loki-compatible endpoint, yet…. (β€œwink wink” to OpenObserve to consider building one in the future) πŸ˜‰πŸ˜‰

A common workaround for this format mismatch is to create a custom endpoint that receives data from Promtail and then forwards it to OpenObserve in the required format.

In the next steps we will create a HTTP proxy/adapter in Python and a Docker setup to run this proxy.

Problem Statement

  • Promtail sends logs in Loki's format (protobuf/JSON) to /loki/api/v1/push

  • OpenObserve expects logs in its own JSON format at /api/{org}/{stream}/_json

  • The result? Direct connection fails with 400 Bad Request errors

  • The need? A translation layer to convert Loki format to OpenObserve format while preserving log metadata

Simple solution architecture

Docker Logs > Promtail > Loki Proxy (converts format) > OpenObserve

Our proxy will:

  1. Accept Loki-formatted data from Promtail

  2. Convert it to OpenObserve's expected JSON format

  3. Forward the converted data with proper authentication

Prerequisites

  • Docker installed and running

  • OpenObserve instance running (with credentials)

  • Basic knowledge of Docker and Python (Flask libraries included)

Step 1: Create the Proxy Service

Create Project Directory

mkdir loki-openobserve-proxy
cd loki-openobserve-proxy

NOTE: In my case, all files within the folder followed this example:

Create the Python Proxy Script

Create a file named loki_openobserve_proxy.py:

#!/usr/bin/env python3
"""
Simple Loki to OpenObserve Proxy
This proxy accepts any format from Promtail and forwards to OpenObserve
"""

from flask import Flask, request, jsonify
import requests
import json
from datetime import datetime
import logging
import re

app = Flask(__name__)

# OpenObserve configuration
OPENOBSERVE_URL = "http://host.docker.internal:5080/api/{YOUR_ORG}/{YOUR_STREAM_NAME}/_json"
OPENOBSERVE_USERNAME = "<YOUR_USER_NAME>"
OPENOBSERVE_PASSWORD = "<YOUR_PASSWORD>"

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def extract_logs_from_protobuf(raw_data):
    """
    Extract log messages from protobuf data using regex
    This is a simple approach that looks for text patterns
    """
    try:
        # Convert to string, ignoring decode errors
        data_str = raw_data.decode('utf-8', errors='ignore')

        # Look for JSON-like log entries in the protobuf
        # Pattern to find log lines that look like: {"log":"message","stream":"stdout","time":"timestamp"}
        log_pattern = r'\{"[^"]*":"[^"]*"[^}]*\}'
        json_logs = re.findall(log_pattern, data_str)

        openobserve_logs = []

        for json_log in json_logs:
            try:
                log_entry = json.loads(json_log)
                if 'log' in log_entry:
                    # Convert to OpenObserve format
                    openobserve_log = {
                        'timestamp': log_entry.get('time', datetime.now().isoformat() + 'Z'),
                        'message': log_entry.get('log', ''),
                        'stream': log_entry.get('stream', 'stdout'),
                        'job': 'docker',
                        'service': 'promtail'
                    }
                    openobserve_logs.append(openobserve_log)
            except json.JSONDecodeError:
                continue

        # If no JSON found, extract plain text messages
        if not openobserve_logs:
            # Look for readable text in the protobuf
            text_pattern = r'[a-zA-Z0-9\s\.\-_:/]{10,}'
            text_matches = re.findall(text_pattern, data_str)

            for text in text_matches:
                if len(text.strip()) > 5:  # Only meaningful text
                    openobserve_log = {
                        'timestamp': datetime.now().isoformat() + 'Z',
                        'message': text.strip(),
                        'stream': 'stdout',
                        'job': 'docker',
                        'service': 'promtail'
                    }
                    openobserve_logs.append(openobserve_log)

        return openobserve_logs

    except Exception as e:
        logger.error(f"Error extracting logs: {e}")
        return []

@app.route('/loki/api/v1/push', methods=['POST'])
def loki_push():
    """
    Accept any format from Promtail and forward to OpenObserve
    """
    try:
        logger.info(f"Received request from {request.remote_addr}")
        logger.info(f"Content-Type: {request.content_type}")
        logger.info(f"Content-Length: {request.content_length}")

        # Get raw data
        raw_data = request.get_data()
        logger.info(f"Raw data length: {len(raw_data)}")

        openobserve_logs = []

        # Try JSON first
        if request.content_type == 'application/json':
            try:
                loki_data = json.loads(raw_data)
                logger.info("Parsed as JSON successfully")

                for stream in loki_data.get('streams', []):
                    labels = stream.get('stream', {})
                    values = stream.get('values', [])

                    for timestamp_ns, log_line in values:
                        timestamp_seconds = int(timestamp_ns) / 1_000_000_000
                        iso_timestamp = datetime.fromtimestamp(timestamp_seconds).isoformat() + 'Z'

                        log_entry = {
                            'timestamp': iso_timestamp,
                            'message': log_line,
                            **labels
                        }
                        openobserve_logs.append(log_entry)

            except json.JSONDecodeError:
                logger.info("Not valid JSON, treating as protobuf")
                openobserve_logs = extract_logs_from_protobuf(raw_data)
        else:
            # Assume protobuf
            logger.info("Treating as protobuf data")
            openobserve_logs = extract_logs_from_protobuf(raw_data)

        if not openobserve_logs:
            logger.warning("No logs extracted from request")
            return '', 204  # Return success to avoid Promtail retries

        logger.info(f"Extracted {len(openobserve_logs)} log entries")

        # Forward to OpenObserve
        try:
            response = requests.post(
                OPENOBSERVE_URL,
                json=openobserve_logs,
                auth=(OPENOBSERVE_USERNAME, OPENOBSERVE_PASSWORD),
                headers={'Content-Type': 'application/json'},
                timeout=30
            )

            logger.info(f"OpenObserve response: {response.status_code}")

            if response.status_code == 200:
                logger.info("Successfully forwarded to OpenObserve")
                return '', 204
            else:
                logger.error(f"OpenObserve error: {response.status_code} - {response.text}")
                return '', 204  # Still return success to avoid retries

        except requests.RequestException as e:
            logger.error(f"Failed to forward to OpenObserve: {e}")
            return '', 204  # Still return success to avoid retries

    except Exception as e:
        logger.error(f"Request processing error: {e}")
        return '', 204  # Return success to avoid Promtail retries

@app.route('/health', methods=['GET'])
def health():
    return jsonify({"status": "healthy", "service": "simple-loki-proxy"})

@app.route('/', methods=['GET'])
def index():
    return jsonify({
        "service": "Simple Loki to OpenObserve Proxy",
        "endpoints": {
            "loki_push": "/loki/api/v1/push",
            "health": "/health"
        }
    })

if __name__ == '__main__':
    logger.info("Starting Simple Loki to OpenObserve Proxy")
    app.run(host='0.0.0.0', port=3100, debug=True)

Update Configuration

Edit the configuration section inloki_openobserve_proxy.py:

# Update these values for your OpenObserve instance
OPENOBSERVE_URL = "http://host.docker.internal:5080/api/{YOUR_ORG}/{YOUR_STREAM_NAME}/_json"
OPENOBSERVE_USERNAME = "<YOUR_USER_NAME>"
OPENOBSERVE_PASSWORD = "<YOUR_PASSWORD>"

Step 2: Create Docker Container

Create Dockerfile

FROM python:3.11-slim

WORKDIR /app

# Install required packages
RUN pip install flask requests python-snappy

# Copy the proxy script
COPY loki_openobserve_proxy.py .

# Expose the port
EXPOSE 3100

# Run the proxy
CMD ["python", "loki_openobserve_proxy.py"]

Build the Docker Image

docker build -t loki-openobserve-proxy .

Run the Proxy Container

docker run -d --name loki-proxy -p 3100:3100 loki-openobserve-proxy

curl http://localhost:3100/health

Expected response: {"service": "loki-openobserve-proxy", "status": "healthy"}

Step 3: Configure Promtail

Create Promtail Configuration

Createpromtail-config.yamlin the sameloki-openobserve-proxydirectory:

server:
  http_listen_port: 9080
  grpc_listen_port: 0
  log_level: debug  # Increase to debug level to see HTTP requests

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://host.docker.internal:3100/loki/api/v1/push
    headers:
      Content-Type: application/json
    # Force JSON format instead of protobuf
    tenant_id: ""
    # Disable snappy compression
    external_labels:
      service: promtail

scrape_configs:
  - job_name: docker-logs
    static_configs:
      - targets:
          - localhost
        labels:
          job: docker
          __path__: /var/lib/docker/containers/*/*.log

Run Promtail

docker run --rm --name promtail \
  --network host \
  -v /home/leonardo/promtail:/mnt/config \
  -v /var/lib/docker/containers:/var/lib/docker/containers:ro \
  grafana/promtail:3.2.1 \
  --config.file=/mnt/config/promtail-config.yaml

Step 4: Verify the Setup

Check proxy logs to monitor proxy activity:

docker logs -f loki-proxy

You should see something like:
INFO:__main__:Starting Loki to OpenObserve Proxy INFO:__main__:Received request from 172.17.0.1 INFO:__main__:Extracted 5 log entries INFO:__main__:OpenObserve response: 200 INFO:__main__:Successfully forwarded to OpenObserve

Check Promtail Logs

docker logs -f promtail

Look for:

level=info msg="Adding target" key="/var/lib/docker/containers//.log:{job="docker"}"

Verify in OpenObserve

Conclusion

You now have a working Loki-compatible proxy that bridges Promtail and OpenObserve. This solution:

  • Accepts data from Promtail in native Loki format

  • Converts logs to OpenObserve's expected JSON structure

  • Handles both JSON and protobuf formats

  • Provides monitoring and debugging capabilities

Next Steps

  • Set up log retention policies in OpenObserve

  • Configure alerting based on log patterns

  • Add log parsing for structured logs (JSON logs)

  • Implement log filtering and routing

  • Set up monitoring for the proxy itself

Now you know!!! πŸš€

0
Subscribe to my newsletter

Read articles from WhatDoesKmean? directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

WhatDoesKmean?
WhatDoesKmean?

CyberSecurity πŸ‘½ | Splunk Ninja 🦸 | DataDog Tamer 🐾 | Sumo Logic Fighter 🧌 | Wazuh Explorer πŸ§™β€β™‚οΈ | EkoParty 2021 πŸŽ‰ & SANS DFIR 2022 πŸ”‘ Speaker