Data Lakehouse for "Big" Observability

Praneet SharmaPraneet Sharma
4 min read

Usually, a single-tenant or isolated multi-tenant approach is sufficient to meet the observability needs of startups and enterprises, respectively. Aside from running distributed queries across tenants, is there a graceful approach to storing data from all tenants for very long periods of time? Can this be stored in a cost-effective, nicely partitioned, and compressed way?

Object Storage

Object storage represents a "bottomless" and horizontally concurrent way to store batches of data, such as otlp_json. The awss3exporter within the OTel Collector Contrib provides a great way to batch write to S3.

An example config:

receivers:
  otlp:
    protocols:
      http:
        endpoint: '0.0.0.0:4318'

processors:
  batch:

exporters:
  awss3:
    s3uploader:
      region: "us-west-2"
      s3_bucket: "omlet-logs"
      # root “directory” inside the bucket; you can change "logs" to whatever prefix you like
      s3_prefix: "logs"
      # partition by year/month/day/hour/minute
      s3_partition_format: "year=%Y/month=%m/day=%d/hour=%H/minute=%M"
       # gzip compression is optional
      compression: gzip
    sending_queue:
      enabled: true
      num_consumers: 5
      queue_size: 50
    timeout: 10s
  debug:

service:
  pipelines:
    logs:
      receivers: [otlp]
      processors: [batch]
      exporters: [debug, awss3]

The above configuration takes OTLP data (in this example, only logs) and batches it to an S3 bucket. While straightforward, this setup unlocks major capabilities.

Lakehouse DB

OLAP databases like DuckDB and Clickhouse (among others) can easily connect to this storage system. We use CHDB (embedded Clickhouse) to access this object storage.

import os
import json
import gzip
import logging
import boto3
from datetime import datetime, timedelta
import chdb
from itertools import islice
from concurrent.futures import ThreadPoolExecutor, as_completed

# ───────────────────────────────
# Configure logging
# ───────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("otel_ingest")

# ───────────────────────────────
# AWS creds
# ───────────────────────────────
assert os.environ.get("AWS_ACCESS_KEY_ID"), "Missing AWS_ACCESS_KEY_ID"
assert os.environ.get("AWS_SECRET_ACCESS_KEY"), "Missing AWS_SECRET_ACCESS_KEY"
log.info("AWS credentials loaded")

# ───────────────────────────────
# List S3 files for last day
# ───────────────────────────────
s3 = boto3.client("s3", region_name="us-west-2")
now = datetime.utcnow()
start = now - timedelta(days=1)
paginator = s3.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket="omlet-logs", Prefix="logs/")

all_keys = [
    obj["Key"]
    for page in pages
    for obj in page.get("Contents", [])
    if start <= obj["LastModified"].replace(tzinfo=None) <= now and obj["Key"].endswith(".json.gz")
]
log.info(f"Total files found: {len(all_keys)}")
if not all_keys:
    exit(0)

# ───────────────────────────────
# Create target table
# ───────────────────────────────
conn = chdb.connect()
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS otel_logs (
    Timestamp           DateTime64(9),
    TraceId             String,
    SpanId              String,
    TraceFlags          UInt8,
    SeverityText        String,
    SeverityNumber      UInt8,
    ServiceName         String,
    Body                String,
    ResourceAttributes  Map(String, String),
    ScopeAttributes     Map(String, String),
    LogAttributes       Map(String, String)
) ENGINE = Memory
""")
log.info("Table otel_logs ready")

# ───────────────────────────────
# Helper to process a single file → list of dicts
# ───────────────────────────────
def parse_file(key):
    log.info(f"Starting download of {key}")
    blob = s3.get_object(Bucket="omlet-logs", Key=key)["Body"].read()
    log.info(f"Downloaded {key} ({len(blob)} bytes)")
    payload = json.loads(gzip.decompress(blob))
    entries = payload.get("resourceLogs", [])
    log.info(f"Parsed JSON for {key}, found {len(entries)} resourceLogs entries")

    out = []
    for r in entries:
        res_attrs = {a["key"]: a["value"]["stringValue"] for a in r["resource"]["attributes"]}
        for sl in r["scopeLogs"]:
            scope_attrs = {a["key"]: a["value"]["stringValue"] for a in sl["scope"].get("attributes", [])}
            service = sl["scope"].get("name", "")
            for logrec in sl["logRecords"]:
                ts = datetime.utcfromtimestamp(int(logrec["timeUnixNano"]) / 1e9).isoformat(sep=' ')
                rec = {
                    "Timestamp": ts,
                    "TraceId": logrec.get("traceId", ""),
                    "SpanId": logrec.get("spanId", ""),
                    "TraceFlags": logrec.get("traceFlags", 0),
                    "SeverityText": logrec.get("severityText", ""),
                    "SeverityNumber": logrec.get("severityNumber", 0),
                    "ServiceName": service,
                    "Body": logrec["body"]["stringValue"],
                    "ResourceAttributes": res_attrs,
                    "ScopeAttributes": scope_attrs,
                    "LogAttributes": {a["key"]: a["value"]["stringValue"] for a in logrec.get("attributes", [])}
                }
                out.append(rec)
    log.info(f"Processed {key}, constructed {len(out)} records")
    return out

# ───────────────────────────────
# Chunking utilities
# ───────────────────────────────
def chunked(iterable, size):
    it = iter(iterable)
    while True:
        batch = list(islice(it, size))
        if not batch:
            break
        yield batch

batch_size = 100
row_batch_size = 100000  # JSONEachRow batch size
total = 0

# ───────────────────────────────
# Main loop: batches of files
# ───────────────────────────────
for batch_keys in chunked(all_keys, batch_size):
    log.info(f"Processing batch of {len(batch_keys)} files")
    # parallel parse → list of dicts
    rows_accum = []
    with ThreadPoolExecutor(max_workers=8) as ex:
        futures = {ex.submit(parse_file, key): key for key in batch_keys}
        for fut in as_completed(futures):
            rows_accum.extend(fut.result())

    # bulk insert in JSONEachRow chunks
    for chunk in chunked(rows_accum, row_batch_size):
        payload = "\n".join(json.dumps(r, default=str) for r in chunk)
        sql = "INSERT INTO otel_logs FORMAT JSONEachRow\n" + payload
        cur.execute(sql)
        log.info(f"Inserted JSONEachRow batch of {len(chunk)} rows")
        total += len(chunk)

log.info(f"Total rows inserted: {total}")

# Example select query
import pandas as pd
df = pd.read_sql("SELECT SeverityText, count() AS cnt FROM otel_logs GROUP BY SeverityText", conn)
log.info("Counts by severity:\n" + df.to_string(index=False))

conn.close()
log.info("Done.")

In the code above, we load a series of compressed (gzip) otlp_json files from a specific time window (the past day), create an in-memory table, insert rows in batches, and query for counts by severity level.

Going “Wide”

The code above shows a simple way to query using one worker. However, the advantage of object storage is that it allows multiple workers to access partitions in the bucket. For example:

  1. Access log/trace data by minute per worker for the past hour for large-scale data and aggregate it.

  2. Partition by "Service" and "Time" to query high-resolution data at scale during live incidents.

Our vision at Omlet is to use OTel as a tool to create the best Observability Lakehouse experience.

0
Subscribe to my newsletter

Read articles from Praneet Sharma directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Praneet Sharma
Praneet Sharma