Why Building ML Pipelines in Golang is the best thing

Madiha NazirMadiha Nazir
9 min read

Architecture Overview

Here’s what the system does to detect intrusion:

  1. Captures network packets (using gopacket)

  2. Aggregates flows (grouping packets by source/destination)

  3. Computes metrics (packet sizes, timings, etc.)

  4. Runs ML inference (using ONNX models)

  5. Logs results (PostgreSQL)

  6. Serves data via API (Fiber + React frontend)

Now, let’s see how Go makes this efficient!


πŸ” Packet β†’ Inference Pipeline in Go

  • Packet Capture (gopacket/pcap)

πŸ“Œ File: main.go β†’ processPackets()
πŸ“Œ Library: github.com/google/gopacket

When a packet arrives (say, on eth0), Go reads it using:

handle, _ := pcap.OpenLive(interfaceName, snapshotLen, promiscuous, timeout)
packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
for packet := range packetSource.Packets() {
    processPacket(packet) // Handles each packet
}

Key Concept: Goroutines handle packets concurrently without blocking.


  • Flow Key Generation (map[string]*FlowMetrics)

πŸ“Œ File: utils.go β†’ getCanonicalFlowKey()

Each packet is grouped into a flow (like a TCP/UDP session). We generate a unique key:

func getCanonicalFlowKey(packet gopacket.Packet) string {
    // Extract srcIP, srcPort, dstIP, dstPort
    return fmt.Sprintf("%s:%d-%s:%d", srcIP, srcPort, dstIP, dstPort)
}

βœ” Key Concept: Go maps store flows efficiently.


  • Flow Metrics Update (Concurrency-safe!)

πŸ“Œ File: flows.go β†’ Global map[string]*FlowMetrics

We track:

  • Packet lengths

  • Timestamps

  • Packet counts

But since multiple goroutines update this map, we use sync.RWMutex to prevent race conditions:

var flows = make(map[string]*FlowMetrics)
var flowsMutex sync.RWMutex

func updateFlow(key string, packetLen int) {
    flowsMutex.Lock()
    defer flowsMutex.Unlock()

    if flow, exists := flows[key]; exists {
        flow.Update(packetLen) // Update existing flow
    } else {
        flows[key] = NewFlowMetrics() // Create new flow
    }
}

βœ” Key Concept: Mutexes ensure thread-safe map operations.


  • Inference Trigger (Every 10 Seconds)

πŸ“Œ File: main.go β†’ inferenceLoop()

Every 10 seconds, we:

  1. Take a snapshot of flows (to avoid race conditions)

  2. Compute final metrics (mean, std deviation, packet rate, etc.)

  3. Prepare feature matrix ([][]float64) for ML models

func inferenceLoop() {
    for range time.Tick(10 * time.Second) {
        flowsMutex.RLock()
        snapshot := copyFlows(flows) // Safe copy
        flowsMutex.RUnlock()

        features := prepareFeatureMatrix(snapshot)
        runInference(features) // Pass to ML model
    }
}

βœ” Key Concept: time.Tick for periodic tasks.


  • Preprocessing (Clean & Normalize Data)

πŸ“Œ File: utils.go β†’ preprocessFeatures()

The Challenge

  • ML models need normalized data (mean=0, std=1).

  • In Python, we’d use sklearn.preprocessing.

Go Implementation

  1. Replace Inf β†’ NaN.

  2. Median imputation (fill missing values).

  3. Z-score scaling.

Before feeding data to the ML model, we:

  • Replace Inf β†’ NaN

  • Fill missing values (median imputation)

  • Standardize (Z-score scaling)

βœ” Key Concept: Data pipelines in Go are fast (no Python bottlenecks!).



  • Logging to PostgreSQL

πŸ“Œ File: inference.go

We log:

  • flow_id

  • anomaly_score

  • attack_type (if any)

    Best Practices

    1. Use connection pooling (avoid opening/closing per query).

    2. Prepared statements (faster repeated queries).

_, err := db.Exec(`
    INSERT INTO detections 
    (flow_id, score, status, attack_type) 
    VALUES ($1, $2, $3, $4)`,
    flowID, score, status, attackType,
)

net/http vs Fiber: Choosing an API Framework

  • Option 1: Standard net/http

    • Simple, no dependencies.

    • Manual routing (http.HandleFunc).

    http.HandleFunc("/logs", func(w http.ResponseWriter, r *http.Request) {
        logs := fetchLogsFromDB()
        json.NewEncoder(w).Encode(logs)
    })
    http.ListenAndServe(":8080", nil)

Option 2: Fiber (Express-like API)

  • Faster (built on FastHTTP).

  • Middleware support (auth, logging).

    app := fiber.New()
    app.Get("/logs", func(c *fiber.Ctx) error {
        logs := fetchLogsFromDB()
        return c.JSON(logs)
    })
    app.Listen(":8080")

Key Takeaways

βœ” net/http β†’ Good for small APIs.
βœ” Fiber β†’ Better for complex routing/middleware.


  • Frontend (React)

The frontend polls /logs every few seconds and displays:

  • Attack alerts

  • Flow details

  • Network throughput


Key Go Concepts Used

βœ… Goroutines β†’ Concurrent packet processing
βœ… sync.RWMutex β†’ Thread-safe map access
βœ… net/http / Fiber β†’ REST API
βœ… ONNX Runtime β†’ ML inference in Go
βœ… Channels & signal.Notify β†’ Graceful shutdown



Now, let’s dig deeper into the key Golang concept that made this project efficient, scalable, and production-ready.

If you’re learning Go (like me!), understanding this will help you write better concurrent, performant, and maintainable systems.


1️⃣ Goroutines: Lightweight Concurrency

What They Do

  • Goroutines are lightweight threads managed by the Go runtime.

  • They enable concurrent execution without the overhead of OS threads.

How I Used Them

  • Packet processing (processPackets()) runs in one goroutine.

  • Inference loop (inferenceLoop()) runs in another.

  • API server (startAPIServer()) runs independently.

func main() {
    go processPackets()     // Handles live packet capture
    go inferenceLoop()      // Runs ML every 10 seconds
    go startAPIServer()     // Serves REST API

    // Blocks until SIGINT/SIGTERM
    <-make(chan os.Signal, 1)
}

Key Takeaways

βœ” No manual thread management (unlike C++/Java).
βœ” Efficient (can spawn thousands without high memory cost).
βœ” Use go keyword to run any function concurrently.


2️⃣ sync.RWMutex: Safe Concurrent Map Access

The Problem

  • Multiple goroutines (packet processor + inference loop) access the same map[string]*FlowMetrics.

  • Race conditions can occur if not synchronized.

The Solution: Mutexes

  • sync.RWMutex allows:

    • Multiple readers (concurrent reads when no writer).

    • Exclusive lock for writes.

var flows = make(map[string]*FlowMetrics)
var flowsMutex sync.RWMutex

// Safe read (for inference)
flowsMutex.RLock()
defer flowsMutex.RUnlock()
flow, exists := flows[key]

// Safe write (packet processing)
flowsMutex.Lock()
defer flowsMutex.Unlock()
flows[key] = newFlow

Key Takeaways

βœ” RLock() β†’ For read-heavy operations (e.g., inference).
βœ” Lock() β†’ For writes (e.g., updating flows).
βœ” Always defer Unlock() to prevent deadlocks.

Example: Running Two Tasks Concurrently

func printNumbers() {
    for i := 1; i <= 5; i++ {
        fmt.Println(i)
        time.Sleep(100 * time.Millisecond)
    }
}

func printLetters() {
    for c := 'a'; c <= 'e'; c++ {
        fmt.Printf("%c\n", c)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    go printNumbers() // Runs in background
    go printLetters() // Runs in background

    // Wait to prevent program from exiting
    time.Sleep(1 * time.Second)
}

Output (interleaved):

1  
a  
2  
b  
3  
c  
...

βœ” Key Idea: go func() runs func() in a new goroutine.


Channels: Communicating Between Goroutines

Since goroutines run independently, we need a way to share data safely.

  • Channels are like thread-safe pipes for sending/receiving data.

Example: Sending Data Between Goroutines

func worker(jobs <-chan int, results chan<- int) {
    for job := range jobs {
        results <- job * 2 // Send result back
    }
}

func main() {
    jobs := make(chan int, 5)
    results := make(chan int, 5)

    go worker(jobs, results) // Start worker

    // Send jobs
    for i := 1; i <= 3; i++ {
        jobs <- i
    }
    close(jobs) // No more jobs

    // Collect results
    for i := 1; i <= 3; i++ {
        fmt.Println(<-results)
    }
}

Output:

2  
4  
6

βœ” Key Idea:

  • jobs <- i sends data to the channel.

  • <-results receives data from the channel.


2️⃣ The Problem: Race Conditions

What Happens Without Synchronization?

If multiple goroutines access shared data, bad things happen:

  • Example: Two goroutines updating the same map.
var counter = 0

func increment() {
    for i := 0; i < 1000; i++ {
        counter++ // ❌ Unsafe!
    }
}

func main() {
    go increment()
    go increment()
    time.Sleep(1 * time.Second)
    fmt.Println(counter) // Might print 1200 instead of 2000!
}

Why?

  • counter++ is not atomic (read β†’ modify β†’ write).

  • Goroutines interfere, leading to lost updates.


The Solution: Mutexes (sync.Mutex & sync.RWMutex)

What is a Mutex?

  • "Mutual Exclusion Lock" β†’ Only one goroutine can hold it at a time.

  • Others wait until it’s unlocked.

Using sync.Mutex

var counter = 0
var mutex sync.Mutex

func increment() {
    for i := 0; i < 1000; i++ {
        mutex.Lock()   // πŸ”’ Lock
        counter++      // βœ… Safe update
        mutex.Unlock() // πŸ”“ Unlock
    }
}

βœ” Now counter will always be 2000.


Optimizing with sync.RWMutex (Read-Write Mutex)

  • Problem: sync.Mutex is too strict if we have:

    • Many readers (goroutines that only read data).

    • Few writers (goroutines that modify data).

How sync.RWMutex Works

OperationEffect
Lock()πŸ”’ Exclusive lock (only 1 writer)
Unlock()πŸ”“ Release writer lock
RLock()πŸ”’ Shared lock (many readers)
RUnlock()πŸ”“ Release reader lock

Example: Safe Map Access

var flows = make(map[string]int)
var flowsMutex sync.RWMutex

// Writer (updates flow)
func updateFlow(key string) {
    flowsMutex.Lock()         // πŸ”’ Exclusive lock
    defer flowsMutex.Unlock() // πŸ”“ Unlock when done
    flows[key]++
}

// Reader (gets flow stats)
func getFlow(key string) int {
    flowsMutex.RLock()         // πŸ”’ Shared lock (many can read)
    defer flowsMutex.RUnlock() // πŸ”“ Release read lock
    return flows[key]
}

βœ” Key Idea:

  • Writers use Lock() β†’ block all others.

  • Readers use RLock() β†’ allow concurrent reads.


Real-World Example: Flow Aggregation in My Project

Problem:

  • Packet processor (updates flows in map).

  • Inference loop (reads flows every 10s).

  • Race condition risk!

Solution: sync.RWMutex

var flows = make(map[string]*FlowMetrics)
var flowsMutex sync.RWMutex

// Packet processor (WRITER)
func processPacket(packet gopacket.Packet) {
    key := getFlowKey(packet)

    flowsMutex.Lock()       // πŸ”’ Lock for write
    defer flowsMutex.Unlock()

    if flow, exists := flows[key]; exists {
        flow.Update(packet)  // Update existing flow
    } else {
        flows[key] = NewFlow(packet) // Add new flow
    }
}

// Inference loop (READER)
func inferenceLoop() {
    for range time.Tick(10 * time.Second) {
        flowsMutex.RLock()  // πŸ”’ Lock for read
        snapshot := copyFlows(flows) // Safe read
        flowsMutex.RUnlock()

        runInference(snapshot)
    }
}

βœ” Why This Works:

  • Packet processing (processPacket) locks briefly for updates.

  • Inference (inferenceLoop) allows concurrent reads with RLock().


Common Pitfalls & Best Practices

⚠️ Deadlocks

  • Cause: Forgetting to Unlock() or RUnlock().

  • Fix: Always use defer Unlock().

Best Practices

  1. Use defer Unlock() (even if you return early).

  2. Prefer RWMutex if reads > writes.

  3. Minimize lock duration (do heavy work outside locks).


Summary

  • Goroutines β†’ Lightweight concurrency.

  • Channels β†’ Safe communication.

  • sync.Mutex β†’ Basic exclusive locking.

  • sync.RWMutex β†’ Optimized for read-heavy workloads.


3️⃣ ONNX Runtime in Go: ML Inference Without Python

Why ONNX?

  • Pre-trained models (PyTorch/TensorFlow β†’ ONNX format).

  • No Python dependency (runs natively in Go).

How It Works

  1. Load model (ort.NewAdvancedSession).

  2. Prepare input tensor (convert [][]float64 β†’ ONNX-compatible format).

  3. Run inference (session.Run()).

session, _ := ort.NewAdvancedSession("model.onnx")
inputTensor := ort.NewTensor(ort.Float32, featuresShape, featuresData)
outputs, _ := session.Run([]*ort.Tensor{inputTensor})

anomalyScore := outputs[0].Value().(float32)
if anomalyScore <= threshold {
    attackType := runClassification(outputs)
}

Key Takeaways

βœ” Faster than Python (no GIL, no inter-process calls).
βœ” Supports most ML models (if exported to ONNX).
βœ” Memory-efficient (direct tensor passing).


4️⃣ Channels & signal.Notify: Graceful Shutdown

The Problem

  • If the app crashes, we lose flow data.

  • Need to clean up resources (DB connections, file handles).

The Solution: Signal Handling

  • signal.Notify captures SIGINT (Ctrl+C) / SIGTERM.

  • Channels coordinate shutdown.

func main() {
    stopChan := make(chan os.Signal, 1)
    signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM)

    go processPackets()
    go inferenceLoop()

    <-stopChan  // Blocks until signal received
    cleanup()   // Close DB, save state, etc.
    os.Exit(0)
}

Key Takeaways

βœ” Prevents data loss on shutdown.
βœ” Cleanup hooks (close DB, flush logs).
βœ” Standard pattern for long-running services.



Final Thoughts

This project taught me how Go excels at:
βœ… Concurrency (goroutines + channels).
βœ… Performance (low-latency packet processing).
βœ… Integration (C-libs like ONNX, DBs like PostgreSQL).

If you’re building a real-time system, Go is a fantastic choice.

0
Subscribe to my newsletter

Read articles from Madiha Nazir directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Madiha Nazir
Madiha Nazir