Why Building ML Pipelines in Golang is the best thing

Table of contents
- Architecture Overview
- π Packet β Inference Pipeline in Go
- Key Go Concepts Used
- Now, letβs dig deeper into the key Golang concept that made this project efficient, scalable, and production-ready.
- 1οΈβ£ Goroutines: Lightweight Concurrency
- 2οΈβ£ sync.RWMutex: Safe Concurrent Map Access
- 2οΈβ£ The Problem: Race Conditions
- The Solution: Mutexes (sync.Mutex & sync.RWMutex)
- Real-World Example: Flow Aggregation in My Project
- Common Pitfalls & Best Practices
- Summary
- 3οΈβ£ ONNX Runtime in Go: ML Inference Without Python
- 4οΈβ£ Channels & signal.Notify: Graceful Shutdown
- Final Thoughts

Architecture Overview
Hereβs what the system does to detect intrusion:
Captures network packets (using
gopacket
)Aggregates flows (grouping packets by source/destination)
Computes metrics (packet sizes, timings, etc.)
Runs ML inference (using ONNX models)
Logs results (PostgreSQL)
Serves data via API (Fiber + React frontend)
Now, letβs see how Go makes this efficient!
π Packet β Inference Pipeline in Go
Packet Capture (
gopacket/pcap
)
π File: main.go
β processPackets()
π Library: github.com/google/gopacket
When a packet arrives (say, on eth0
), Go reads it using:
handle, _ := pcap.OpenLive(interfaceName, snapshotLen, promiscuous, timeout)
packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
for packet := range packetSource.Packets() {
processPacket(packet) // Handles each packet
}
Key Concept: Goroutines handle packets concurrently without blocking.
Flow Key Generation (
map[string]*FlowMetrics
)
π File: utils.go
β getCanonicalFlowKey()
Each packet is grouped into a flow (like a TCP/UDP session). We generate a unique key:
func getCanonicalFlowKey(packet gopacket.Packet) string {
// Extract srcIP, srcPort, dstIP, dstPort
return fmt.Sprintf("%s:%d-%s:%d", srcIP, srcPort, dstIP, dstPort)
}
β Key Concept: Go maps store flows efficiently.
Flow Metrics Update (Concurrency-safe!)
π File: flows.go
β Global map[string]*FlowMetrics
We track:
Packet lengths
Timestamps
Packet counts
But since multiple goroutines update this map, we use sync.RWMutex
to prevent race conditions:
var flows = make(map[string]*FlowMetrics)
var flowsMutex sync.RWMutex
func updateFlow(key string, packetLen int) {
flowsMutex.Lock()
defer flowsMutex.Unlock()
if flow, exists := flows[key]; exists {
flow.Update(packetLen) // Update existing flow
} else {
flows[key] = NewFlowMetrics() // Create new flow
}
}
β Key Concept: Mutexes ensure thread-safe map operations.
Inference Trigger (Every 10 Seconds)
π File: main.go
β inferenceLoop()
Every 10 seconds, we:
Take a snapshot of flows (to avoid race conditions)
Compute final metrics (mean, std deviation, packet rate, etc.)
Prepare feature matrix (
[][]float64
) for ML models
func inferenceLoop() {
for range time.Tick(10 * time.Second) {
flowsMutex.RLock()
snapshot := copyFlows(flows) // Safe copy
flowsMutex.RUnlock()
features := prepareFeatureMatrix(snapshot)
runInference(features) // Pass to ML model
}
}
β Key Concept: time.Tick
for periodic tasks.
Preprocessing (Clean & Normalize Data)
π File: utils.go
β preprocessFeatures()
The Challenge
ML models need normalized data (mean=0, std=1).
In Python, weβd use
sklearn.preprocessing
.
Go Implementation
Replace
Inf
βNaN
.Median imputation (fill missing values).
Z-score scaling.
Before feeding data to the ML model, we:
Replace
Inf
βNaN
Fill missing values (median imputation)
Standardize (Z-score scaling)
β Key Concept: Data pipelines in Go are fast (no Python bottlenecks!).
Logging to PostgreSQL
π File: inference.go
We log:
flow_id
anomaly_score
attack_type
(if any)Best Practices
Use connection pooling (avoid opening/closing per query).
Prepared statements (faster repeated queries).
_, err := db.Exec(`
INSERT INTO detections
(flow_id, score, status, attack_type)
VALUES ($1, $2, $3, $4)`,
flowID, score, status, attackType,
)
net/http vs Fiber: Choosing an API Framework
Option 1: Standard
net/http
Simple, no dependencies.
Manual routing (
http.HandleFunc
).
http.HandleFunc("/logs", func(w http.ResponseWriter, r *http.Request) {
logs := fetchLogsFromDB()
json.NewEncoder(w).Encode(logs)
})
http.ListenAndServe(":8080", nil)
Option 2: Fiber (Express-like API)
Faster (built on FastHTTP).
Middleware support (auth, logging).
app := fiber.New()
app.Get("/logs", func(c *fiber.Ctx) error {
logs := fetchLogsFromDB()
return c.JSON(logs)
})
app.Listen(":8080")
Key Takeaways
β net/http
β Good for small APIs.
β Fiber β Better for complex routing/middleware.
Frontend (React)
The frontend polls /logs
every few seconds and displays:
Attack alerts
Flow details
Network throughput
Key Go Concepts Used
β
Goroutines β Concurrent packet processing
β
sync.RWMutex
β Thread-safe map access
β
net/http
/ Fiber β REST API
β
ONNX Runtime β ML inference in Go
β
Channels & signal.Notify
β Graceful shutdown
Now, letβs dig deeper into the key Golang concept that made this project efficient, scalable, and production-ready.
If youβre learning Go (like me!), understanding this will help you write better concurrent, performant, and maintainable systems.
1οΈβ£ Goroutines: Lightweight Concurrency
What They Do
Goroutines are lightweight threads managed by the Go runtime.
They enable concurrent execution without the overhead of OS threads.
How I Used Them
Packet processing (
processPackets()
) runs in one goroutine.Inference loop (
inferenceLoop()
) runs in another.API server (
startAPIServer()
) runs independently.
func main() {
go processPackets() // Handles live packet capture
go inferenceLoop() // Runs ML every 10 seconds
go startAPIServer() // Serves REST API
// Blocks until SIGINT/SIGTERM
<-make(chan os.Signal, 1)
}
Key Takeaways
β No manual thread management (unlike C++/Java).
β Efficient (can spawn thousands without high memory cost).
β Use go
keyword to run any function concurrently.
2οΈβ£ sync.RWMutex: Safe Concurrent Map Access
The Problem
Multiple goroutines (packet processor + inference loop) access the same
map[string]*FlowMetrics
.Race conditions can occur if not synchronized.
The Solution: Mutexes
sync.RWMutex
allows:Multiple readers (concurrent reads when no writer).
Exclusive lock for writes.
var flows = make(map[string]*FlowMetrics)
var flowsMutex sync.RWMutex
// Safe read (for inference)
flowsMutex.RLock()
defer flowsMutex.RUnlock()
flow, exists := flows[key]
// Safe write (packet processing)
flowsMutex.Lock()
defer flowsMutex.Unlock()
flows[key] = newFlow
Key Takeaways
β RLock() β For read-heavy operations (e.g., inference).
β Lock() β For writes (e.g., updating flows).
β Always defer Unlock()
to prevent deadlocks.
Example: Running Two Tasks Concurrently
func printNumbers() {
for i := 1; i <= 5; i++ {
fmt.Println(i)
time.Sleep(100 * time.Millisecond)
}
}
func printLetters() {
for c := 'a'; c <= 'e'; c++ {
fmt.Printf("%c\n", c)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
go printNumbers() // Runs in background
go printLetters() // Runs in background
// Wait to prevent program from exiting
time.Sleep(1 * time.Second)
}
Output (interleaved):
1
a
2
b
3
c
...
β Key Idea: go func()
runs func()
in a new goroutine.
Channels: Communicating Between Goroutines
Since goroutines run independently, we need a way to share data safely.
- Channels are like thread-safe pipes for sending/receiving data.
Example: Sending Data Between Goroutines
func worker(jobs <-chan int, results chan<- int) {
for job := range jobs {
results <- job * 2 // Send result back
}
}
func main() {
jobs := make(chan int, 5)
results := make(chan int, 5)
go worker(jobs, results) // Start worker
// Send jobs
for i := 1; i <= 3; i++ {
jobs <- i
}
close(jobs) // No more jobs
// Collect results
for i := 1; i <= 3; i++ {
fmt.Println(<-results)
}
}
Output:
2
4
6
β Key Idea:
jobs <- i
sends data to the channel.<-results
receives data from the channel.
2οΈβ£ The Problem: Race Conditions
What Happens Without Synchronization?
If multiple goroutines access shared data, bad things happen:
- Example: Two goroutines updating the same
map
.
var counter = 0
func increment() {
for i := 0; i < 1000; i++ {
counter++ // β Unsafe!
}
}
func main() {
go increment()
go increment()
time.Sleep(1 * time.Second)
fmt.Println(counter) // Might print 1200 instead of 2000!
}
Why?
counter++
is not atomic (read β modify β write).Goroutines interfere, leading to lost updates.
The Solution: Mutexes (sync.Mutex & sync.RWMutex)
What is a Mutex?
"Mutual Exclusion Lock" β Only one goroutine can hold it at a time.
Others wait until itβs unlocked.
Using sync.Mutex
var counter = 0
var mutex sync.Mutex
func increment() {
for i := 0; i < 1000; i++ {
mutex.Lock() // π Lock
counter++ // β
Safe update
mutex.Unlock() // π Unlock
}
}
β Now counter
will always be 2000
.
Optimizing with sync.RWMutex
(Read-Write Mutex)
Problem:
sync.Mutex
is too strict if we have:Many readers (goroutines that only read data).
Few writers (goroutines that modify data).
How sync.RWMutex
Works
Operation | Effect |
Lock() | π Exclusive lock (only 1 writer) |
Unlock() | π Release writer lock |
RLock() | π Shared lock (many readers) |
RUnlock() | π Release reader lock |
Example: Safe Map Access
var flows = make(map[string]int)
var flowsMutex sync.RWMutex
// Writer (updates flow)
func updateFlow(key string) {
flowsMutex.Lock() // π Exclusive lock
defer flowsMutex.Unlock() // π Unlock when done
flows[key]++
}
// Reader (gets flow stats)
func getFlow(key string) int {
flowsMutex.RLock() // π Shared lock (many can read)
defer flowsMutex.RUnlock() // π Release read lock
return flows[key]
}
β Key Idea:
Writers use
Lock()
β block all others.Readers use
RLock()
β allow concurrent reads.
Real-World Example: Flow Aggregation in My Project
Problem:
Packet processor (updates flows in
map
).Inference loop (reads flows every 10s).
Race condition risk!
Solution: sync.RWMutex
var flows = make(map[string]*FlowMetrics)
var flowsMutex sync.RWMutex
// Packet processor (WRITER)
func processPacket(packet gopacket.Packet) {
key := getFlowKey(packet)
flowsMutex.Lock() // π Lock for write
defer flowsMutex.Unlock()
if flow, exists := flows[key]; exists {
flow.Update(packet) // Update existing flow
} else {
flows[key] = NewFlow(packet) // Add new flow
}
}
// Inference loop (READER)
func inferenceLoop() {
for range time.Tick(10 * time.Second) {
flowsMutex.RLock() // π Lock for read
snapshot := copyFlows(flows) // Safe read
flowsMutex.RUnlock()
runInference(snapshot)
}
}
β Why This Works:
Packet processing (
processPacket
) locks briefly for updates.Inference (
inferenceLoop
) allows concurrent reads withRLock()
.
Common Pitfalls & Best Practices
β οΈ Deadlocks
Cause: Forgetting to
Unlock()
orRUnlock()
.Fix: Always use
defer Unlock()
.
Best Practices
Use
defer Unlock()
(even if youreturn
early).Prefer
RWMutex
if reads > writes.Minimize lock duration (do heavy work outside locks).
Summary
Goroutines β Lightweight concurrency.
Channels β Safe communication.
sync.Mutex
β Basic exclusive locking.sync.RWMutex
β Optimized for read-heavy workloads.
3οΈβ£ ONNX Runtime in Go: ML Inference Without Python
Why ONNX?
Pre-trained models (PyTorch/TensorFlow β ONNX format).
No Python dependency (runs natively in Go).
How It Works
Load model (
ort.NewAdvancedSession
).Prepare input tensor (convert
[][]float64
β ONNX-compatible format).Run inference (
session.Run
()
).
session, _ := ort.NewAdvancedSession("model.onnx")
inputTensor := ort.NewTensor(ort.Float32, featuresShape, featuresData)
outputs, _ := session.Run([]*ort.Tensor{inputTensor})
anomalyScore := outputs[0].Value().(float32)
if anomalyScore <= threshold {
attackType := runClassification(outputs)
}
Key Takeaways
β Faster than Python (no GIL, no inter-process calls).
β Supports most ML models (if exported to ONNX).
β Memory-efficient (direct tensor passing).
4οΈβ£ Channels & signal.Notify: Graceful Shutdown
The Problem
If the app crashes, we lose flow data.
Need to clean up resources (DB connections, file handles).
The Solution: Signal Handling
signal.Notify
capturesSIGINT
(Ctrl+C) /SIGTERM
.Channels coordinate shutdown.
func main() {
stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM)
go processPackets()
go inferenceLoop()
<-stopChan // Blocks until signal received
cleanup() // Close DB, save state, etc.
os.Exit(0)
}
Key Takeaways
β Prevents data loss on shutdown.
β Cleanup hooks (close DB, flush logs).
β Standard pattern for long-running services.
Final Thoughts
This project taught me how Go excels at:
β
Concurrency (goroutines + channels).
β
Performance (low-latency packet processing).
β
Integration (C-libs like ONNX, DBs like PostgreSQL).
If youβre building a real-time system, Go is a fantastic choice.
Subscribe to my newsletter
Read articles from Madiha Nazir directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
