Go Concurrency Patterns With Real World Applications : Pipeline and Fan Out/In

Tharindu ErangaTharindu Eranga
19 min read

Introduction

Concurrency is one of Go’s core strengths, enabling efficient execution of tasks. In this article, we will explore two important concurrency patterns: Pipeline and Fan-Out.

To begin, we will demonstrate the Pipeline Pattern with an example that generates random numbers and determines whether each number is a prime number. We gonna use finding prime numbers because it’s a heavy computational task.

Note : Also we can use tasks like Matrix Multiplication (Higher Dimensions), Fibonacci Sequence Computation and Sorting Large Datasets.

Then, we will introduce the Fan-Out/In pattern to the same pipeline, This provides a way to distribute work amongst a group of workers to improve performace by parallelizing CPU use and I/O.

To better understand the impact of Fan-Out, we will compare execution times before and after implementing it within the same pipeline. This comparison will highlight the performance improvements achieved through go routines.

Finally, we will design real world service architectures where we can use these patterns to develop high performant data processing systems using go.

Concurrency Patterns

Pipeline is a sequence of stages where each stage performs a transformation on the data and passes it to the next stage through channels. The first stage acts as a producer, while the last stage serves as the consumer. Intermediate stages transform the data and pass it along the pipeline.

Each stage in the pipeline can have multiple inbound and outbound channels, except for the first and last stages. This structure allows data to flow in a linear fashion from one stage to the next.

To enhance scalability, the Fan Out and Fan In patterns can be applied to the stages.

The Fan Out pattern enables a stage to distribute its workload across multiple workers, increasing throughput by processing data concurrently.

The Fan In pattern allows multiple concurrent processes to aggregate data into a single stage, ensuring that data from different sources is combined effectively.

Go Concurrency Primitives

There are couple of go concurrency primitives that we're going to use in this example. Down below is a small explanation of those primitives.

Goroutines

A goroutine is a lightweight thread managed by the Go runtime. They start in user space and are managed by the Go runtime scheduler. The Go scheduler efficiently manages thousands of go routines using a small number of OS threads.

Unlike traditional threads, goroutines have minimal memory overhead. Also they allow us to execute functions concurrently. We can spin up thousands of them at once easily.

Also these little programs can create chaos if we do not handle them correctly !!!

Unbuffered Channels

We're going to use unbuffered channels to enable synchronous communication between pipeline stages. There are couple of things we need to keep in mind when working with unbuffered channels.

  • A send operation on an unbuffered channel blocks the sending goroutine until another goroutine performs the corresponding receive on the same channel.

  • A receive operation on an unbuffered channel blocks if no data is available to read from the channel until a goroutine sends a value.

This blocking behavior ensures that both the sender and receiver synchronize at the point of communication, making unbuffered channels useful for synchronization between Goroutines.

Wait Groups

A WaitGroup in Go is a concurrency primitive provided by the sync package. It allows us to wait for multiple goroutines to complete before the program proceeds.

  • Other than go primitives, generic types are being used for the type safety and reusability. We are going to use generic types instead of interface{}. Unlike interface{} generics enforce compile-time type checks. Also, no need for type assertions or reflection, so it's performant !!!

Pipeline

This example pipeline consists of three stages, as illustrated below. This example demonstrates the Pipeline pattern, where random numbers are generated and checked for prime numbers.

  1. Producer: This stage is responsible for generating numbers in to the funnel channel.

  2. Processor: This stage reads from the funnel channel performing the computations to find prime numbers and then writes to the processed channel.

  3. Consumer: This stage consumes the prime numbers from the processed channel.

Each stage works in order, passing data from one to the next, starting with the Producer and ending with the Consumer.

Let’s code this up !!!

Down below is the code implementation and the output of the above pipeline diagram. Let’s run the code and find out how much time it’s takes to execute. There are no scaled up stages in this and just data passing through each pipeline stages. Also we have limit the take count to 10 in the consumer to take control of the producer and make the code more efficient.

That way producer gonna block after 10th number has been read by the downstream routine. Remember? That’s how unbuffered channels works !!!

We will need to install go and create a project folder and a main.go file. Copy and paste below and build and run the binary. Please refer to the link shown down below to install and getting started with go.

https://go.dev/doc/tutorial/getting-started

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// Producer is going to run infinite loop and generate random numbers in to funnel channel.
// Producer will exit when a signal received from the done channel.
func Producer[T any](
    done <-chan struct{},
    fn func() T,
) <-chan T {
    funnel := make(chan T)

    go func() {
        defer close(funnel)
        for {
            select {
            case <-done:
                return
            case funnel <- fn():
            }
        }
    }()

    return funnel
}

// Processor function going to find the number received from the funnel channel is a prime or not.
// Processor will return a primeStream channel which includes prime numbers.
// Processor will exit when a signal received from the done channel.
func Processor(
    done <-chan struct{},
    funnel <-chan int,
) <-chan int {
    primeStream := make(chan int)

    isPrime := func(number int) bool {
        for i := number - 1; i > 1; i-- {
            if number%i == 0 {
                return false
            }
        }
        return true
    }

    go func() {
        defer close(primeStream)
        for {
            select {
            case <-done:
                return
            case number := <-funnel:
                if isPrime(number) {
                    primeStream <- number
                }
            }
        }
    }()

    return primeStream
}

// Consumer function going to consume n number of items from processedStream channel and 
// write it to takenStream channel.
// Consumer will exit when a signal received from the done channel.
func Consumer[T any](
    done <-chan struct{},
    processedStream <-chan T,
    n int,
) <-chan T {
    takenStream := make(chan T)

    go func() {
        defer close(takenStream)
        for i := 0; i < n; i++ {
            select {
            case <-done:
                return
            case takenStream <- <-processedStream:
            }
        }
    }()

    return takenStream
}

func main() {
    // start of the program to find out the total execution time of the pipeline.
    start := time.Now()
    // create done channel for pipeline stages to signal to stop the work and exit.
    done := make(chan struct{})
    // closing done channel will signal all the downstream go routines to exit.
    defer close(done)
    // random number generator.
    rn := func() int { return rand.Intn(500000000) }
    // run the producer.
    funnelChannel := Producer(done, rn)
    // run the processor.
    processedChannel := Processor(done, funnelChannel)
    // run the consumer.
    for output := range Consumer(done, processedChannel, 10) {
        // output the prime numbers.
        fmt.Println(output)
    }
    // log the execution time.
    fmt.Print("Pipeline execution finished and took: ", time.Since(start))
}

Above screenshot shows how much time it took to run the pipeline. it’s more than 4 minutes in my machine. This demonstrate this pipeline has a slow stage which is processor because prime finding computation takes more time to run. We need to find a solution to scale this stage. That’s where fan out pattern comes in.

Let’s implement that now !!!

Pipeline With Fan Out/In

To maximize the efficiency of logical CPU cores that available to us, we enhance the processor stage by scaling it up using go routines. Each processor goroutine writes to its own output channel, ensuring workload distribution. To consolidate the results, we introduce an additional stage called collector to the pipeline that aggregates data from all Fan-Out channels into a single Fan-In channel.

  1. Producer - This stage is responsible for generating numbers in to the funnel channel.

  2. Processor - This stage is responsible for computing prime numbers and fanning out to multiple instances using goroutines. Each goroutine will write to it’s own output channel.
    Ex : let’s say we fan out to 10 workers of processor function, so that’s means 10 output channels.

  3. Collector - This stage is responsible for collecting data from fan out channels.

  4. Consumer - This stage responsible for consume processed data from fan in channel.

This diagram illustrates the pipeline with the new changes. We have used the Go runtime package to fan out the processor stage based on the available CPU count.

Let’s code the new solution !!!

package main

import (
    "fmt"
    "math/rand"
    "runtime"
    "sync"
    "time"
)

// Producer is going to run a goroutine and generate data in to funnel channel.
// Producer will exit when a signal received from the done channel.
func Producer[T any](
    done <-chan struct{},
    fn func() T,
) <-chan T {
    funnel := make(chan T)

    go func() {
        defer close(funnel)
        for {
            select {
            case <-done:
                return
            case funnel <- fn():
            }
        }
    }()

    return funnel
}

// Processor function going to find the weather the number received 
// from the channel is a prime or not.
// Processor will return a channel which includes prime numbers.
// Processor will exit when a signal received from the done channel.
func Processor(
    done <-chan struct{},
    funnel <-chan int,
) <-chan int {
    primeStream := make(chan int)

    isPrime := func(number int) bool {
        for i := number - 1; i > 1; i-- {
            if number%i == 0 {
                return false
            }
        }
        return true
    }

    go func() {
        defer close(primeStream)
        for {
            select {
            case <-done:
                return
            case number := <-funnel:
                if isPrime(number) {
                    primeStream <- number
                }
            }
        }
    }()

    return primeStream
}

// Collector function going to transfer all the data from 
// the fan out channels to a one single channel (fanInStream).
// Collector will exit when a signal received from the done channel.
func Collector[T any](
    done <-chan struct{},
    channels ...<-chan T,
) <-chan T {
    var wg sync.WaitGroup

    fanInStream := make(chan T)

    transfer := func(in <-chan T) {
        defer wg.Done()
        for n := range in {
            select {
            case <-done:
                return
            case fanInStream <- n:
            }
        }
    }

    for _, c := range channels {
        wg.Add(1)
        go transfer(c)
    }

    go func() {
        wg.Wait()
        close(fanInStream)
    }()

    return fanInStream
}

// Consumer function going to consume n number of items from processed stream channel and 
// write it to takenStream channel.
// Consumer will exit when a signal received from the done channel.
func Consumer[T any](
    done <-chan struct{},
    processedStream <-chan T,
    n int,
) <-chan T {
    takenStream := make(chan T)

    go func() {
        defer close(takenStream)
        for i := 0; i < n; i++ {
            select {
            case <-done:
                return
            case takenStream <- <-processedStream:
            }
        }
    }()

    return takenStream
}

func main() {
    start := time.Now()
    // create done channel for pipeline stages to signal to stop the work and exit.
    done := make(chan struct{})
    // closing done will signal all the downstream go routines to exit.
    defer close(done)
    // random number generator for the pipeline.
    rn := func() int { return rand.Intn(50000000) }
    // run the producer.
    funnelChannel := Producer(done, rn)
    // get the number of logical cpu count usable by the current process.
    cpuCount := runtime.NumCPU()
    // create fan out channels using cpu count.
    fanOutChannels := make([]<-chan int, cpuCount)
    // run multiple workers for processor function using cpu count.
    for i := 0; i < cpuCount; i++ {
        fanOutChannels[i] = Processor(done, funnelChannel)
    }
    // run the collector.
    fanInChannel := Collector(done, fanOutChannels...)
    // run the consumer.
    for output := range Consumer(done, fanInChannel, 10) {
        println(output)
    }
    // log the execution time.
    fmt.Print("Pipeline execution finished and took: ", time.Since(start))
}

As we can see in the output of the modified pipeline program, there is a significant performance improvement. This is because we scaled up the slow processor stage by running multiple workers across the available CPU cores, resulting in better performance.

This is how we can use pipelines and fan-out/fan-in patterns in Go to efficiently process data concurrently and also help to structure concurrent processing in stages.

Sequenced Data Pipeline

Sometimes we just need to process the data without concerning about order of the input and output of the pipeline. For an example Image/video Processing Pipeline.

Here, we are going to talk about when we need to output data in the same order that we feed into the pipeline while scaling up the stages. Our current version of the pipeline doesn’t support this.

So let’s change that !!!

We can achieve this using circular buffered fashion (Data Storage & Processing Mechanism) and blocking and unblocking behavior of the our pipeline stages.

Circular buffer fashion refers to how data is read or processed in a looped, cyclic order. It is typically used in scenarios where we need a continuous flow of data without stopping due to buffer overflow.

  • Fixed Size: A preallocated buffer of a fixed size (e.g., N elements).

  • Cyclic Access: Once the end is reached, it wraps around to the beginning.

Producer writes data to the buffer sequentially. When the buffer is full, new writes overwrite the oldest data. Consumer reads data in a cyclic manner, always picking the next available item.

Below is the updated diagram for the sequenced data pipeline. We now have a solid understanding of how distributing tasks to a worker pool can efficiently process data. In this example, we will focus on task distribution and how to maintain the order of data as it moves through the pipeline. Simply pipeline gets the input in sequentially and process in asynchronous way to achieve the faster processing.

Let’s code an example !!!

The producer will generate numbers between 1 and 100 and send them to the raw stream channels using a circular buffer approach. We’ll set the worker count to 10, meaning each raw stream channel, worker, and processed stream channel will handle 10 numbers. The workers will multiply these numbers by 2 and send them to the processed stream channels. Finally, the consumer will read from the processed channels, using a circular buffer to preserve the order of the data from input to output.

First let’s create a new project and separate package for our pipeline. That way we can keep things clean.

package pipeline

import (
    "context"
    "fmt"
    "strconv"

    "golang.org/x/sync/errgroup"
)

const Workers = 10
const BatchSize = 100

// Pipeline interface
type Pipeline interface {
    Start() error
    Producer(ctx context.Context) error
    CloseRawStreams()
    Processor(ctx context.Context) error
    Worker(ctx context.Context, index uint) error
    Consumer(ctx context.Context) error
}

// pipeline implements the Pipeline interface.
type pipeline struct {
    ctx              context.Context
    rawStreams       []chan int
    processedStreams []chan int
}

// NewPipeline creates a new instance of pipeline.
func NewPipeline(ctx context.Context) Pipeline {
    p := &pipeline{
        ctx:              ctx,
        rawStreams:       make([]chan int, Workers),
        processedStreams: make([]chan int, Workers),
    }
    // let's initiate the required raw and processed stream channels.
    for i := uint(0); i < Workers; i++ {
        p.rawStreams[i] = make(chan int)
        p.processedStreams[i] = make(chan int)
    }

    return p
}

// Start will begin the pipeline stages as go routines and wait for the end of execution.
// we will be using inbuilt errorgroup / context packages for this because they have all the necessary tools for
// handing errors and managing the context cancellation of the background routines.
//
// what is the context ???
//
// context is commonly used in go concurrent programming to control the lifecycle of goroutines and prevent resource leaks
// simply put, we can create a parent context and use it to create child contexts.
// if we cancel the parent context the child contexts also going to be cancelled.
// but child contexts can't cancel their parent context.
// we can pass these contexts between call graph of the goroutines.
// they are usefull when sending signals to the downstream goroutines to exit.
func (p *pipeline) Start() error {
    // create a new child context using main context.
    eGroup, ctx := errgroup.WithContext(p.ctx)
    // execute each pipeline stage as a go routine.
    // producer stage
    // generate data to pipeline using circular buffer fashion.
    eGroup.Go(func() error {
        return p.Producer(ctx)
    })
    // Processor stage.
    // process by multiple workers.
    eGroup.Go(func() error {
        return p.Processor(ctx)
    })
    // Consumer stage.
    // collect the data in the same order using circular buffer fashion.
    eGroup.Go(func() error {
        return p.Consumer(ctx)
    })
    // wait for all the stages to finish their work.
    if err := eGroup.Wait(); err != nil {
        fmt.Printf("error while processing the pipeline: %+v \n", err)

        return err
    }
    // log the end of the pipeline.
    fmt.Println("stages have finished executing")
    // no errors return nil.
    return nil
}

// Producer going to generate data in to raw stream channels using circular buffer fashion.
// most likely in a real world,
// we will use this stage to reading from something like a database and producing data to the pipeline.
func (p *pipeline) Producer(
    ctx context.Context,
) error {
    // close channels when done.
    defer p.CloseRawStreams()
    // let's say we have 100 records in a batch and 10 workers in the pipeline.
    // we will distribute the records evenly among the available workers.
    // so each worker will get 10 tasks to process.
    index := 0
    // generate numbers 1-100
    for i := 1; i <= BatchSize; i++ {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            idx := index % Workers
            // send the data to the raw channels.
            p.rawStreams[idx] <- i
            // log the record and channel index
            fmt.Printf("producer sent %d → raw stream %d \n", i, idx)
            // increment the channel index.
            index++
        }
    }

    fmt.Println("producer finished executing")

    return nil
}

// CloseRawStreams will close all raw stream channels after the data bas been distributed by the producer.
// this will signal workers to stop their work and close processed channels as well.
// eventually consumer will receive the exit signal.
// so no go routines left using resources after the pipeline finished.
func (p *pipeline) CloseRawStreams() {
    for index := uint(0); index < Workers; index++ {
        close(p.rawStreams[index])
    }
    fmt.Println("raw stream channels are closed \n")
}

// Processor is the stage where we scale up to given workers count. 
// multiple instances(Workers) of worker functions will be created.
// each worker going to connect with its corresponding inbound and outbound channel using the worker index.
func (p *pipeline) Processor(
    ctx context.Context,
) error {
    // create a new context for processor stage.
    eGroup, processorCtx := errgroup.WithContext(ctx)
    // Fanning out to multiple workers...
    for index := uint(0); index < Workers; index++ {
        idx := index
        eGroup.Go(func() error {
            return p.Worker(processorCtx, idx)
        })
    }
    // wait for worker routines to finish their work.
    if err := eGroup.Wait(); err != nil {
        fmt.Printf("error in Processor: %v \n", err)

        return err
    }
    // log the end of the worker pool.
    fmt.Println("worker routines have finished executing")
    // no errors return nil.
    return nil
}

// Worker is going to fan out.
// each worker will connect to a raw stream channel and a processed channel using index.
func (p *pipeline) Worker(
    ctx context.Context,
    index uint,
) error {
    for {
        select {
        // Context done then exit.
        case <-ctx.Done():
            fmt.Printf("worker for index %v canceled \n", index)
            // exit the worker.
            return ctx.Err()
        // get the data from indexed raw stream to further enhancing.
        // blocking call.
        case num, open := <-p.rawStreams[index]:
            if !open {
                // if raw stream closed, then indexed processed channel will be closed.
                close(p.processedStreams[index])
                fmt.Printf("processed stream channel %v is closed. worker exiting...\n", index)
                // exit the worker
                return nil
            }
            // multiplies the number by 2.
            enhancedData := num * 2

            select {
            // Context done exit.
            case <-ctx.Done():
                fmt.Printf("worker for index %v canceled during processing \n", index)

                return ctx.Err()
            // enhancedData will be sent to indexed processedStream.
            case p.processedStreams[index] <- enhancedData:
            }
        }
    }
}

// Consumer consumes data in circular buffer fashion to preserve the order of input data.
// For a real world example in this stage we can ,
// store the processed data to a storage, send a message through a message topic, etc...
func (p *pipeline) Consumer(
    ctx context.Context,
) error {
    // starting with index 0
    currentIndex := uint(0)
    // collecting all the data by reading channels.
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("consumer routine canceled \n")

            return ctx.Err()
        // blocking call.
        case processedData, open := <-p.processedStreams[currentIndex]:
            if !open {
                fmt.Printf("all data has been consumed, consumer exiting... \n")

                return nil
            }

            fmt.Println("multiplied numbers output stream :", strconv.Itoa(processedData))
            // set the index to move to the next channel.
            currentIndex = (currentIndex + 1) % Workers
        }
    }
}

Then we can create the main.go

package main

import (
    "context"
    "fmt"

    "sequencer/pipeline"
)

// Main function to start the sequenced pipeline.
func main() {
    // create a parent context for the pipeline.
    ctx, cancel := context.WithCancel(context.Background())
    // cancel the context to signal background tasks to exit.
    defer cancel()
    // initiate the pipeline.
    sequencedPipeline := pipeline.NewPipeline(ctx)
    // start the pipeline.
    if err := sequencedPipeline.Start(); err != nil {
        fmt.Println("pipeline execution failed:", err)
    }
}

We can investigate the logs to find out how is the task distribution happened and data was ordered in the end.

First, let's check the output order by running main.go and piping the output to the grep command to review the logs. If you'd like, you can also write the output to a file and check the order of the numbers there.

go run main.go | grep "multiplied numbers output stream :"

As seen in the screenshot above, the multiplied numbers were output in the correct order.

Let's run main.go with a modified bash command to check the logs and see how tasks are distributed among the raw stream channels.

for i in {0..9}; do echo "raw stream $i count: $(go run main.go | grep "raw stream $i" | wc -l)"; done

Above screenshot shows how 100 numbers was distributed among 10 channels, So each worker will assigned to process 10 records keeping the fairness and efficiency.

Now we have good understanding of performance, task distribution and sequencing data across the pipeline programs, it’s time to design real world services using these concurrency patterns !!!

Service Designs

Near real time event processor service

Nowadays Lot’s of modern applications use system designs patterns like event driven architecture and event sourcing.

Event driven architecture (EDA) is a architecture pattern where components communicate by emitting and reacting to events. When something happens (an event occurs), it triggers responses in other parts of the system asynchronously.

Event Sourcing is a data storage pattern where state changes in an application are captured as a sequence of immutable events. Instead of storing only the latest state of an entity (like a traditional database), an event-sourced system keeps a complete history of all changes.

Each change is stored as an event, and the current state is derived by replaying these events in order.

Let’s say we have to read from a event store and do some modifications and send them right away to a another destination. We can design a near real time logging system using above concurrency patterns.

Our pipeline producer can read from the event store as batches and distribute data to the pipeline sequentially. Workers can do the enhancing to the event data. Consumer will be responsible for collecting data and send them to appropriate destinations.

Then we can update the latest retrieved checkpoint in the store, so producer can continue reading the next batch.

Batch processor service

If we don’t have to worry about the order of the data, we can scale our pipeline application from the infrastructure level to support process multiple batches at once.

Let’s say we have to process large amount of data at the end of the day and It’s time consuming. First we need to divide the total data count in to batches. That way we can distribute workload among a worker group.

We can develop a API to calculate total batches and construct batch messages. Batch-api can send the message to a topic or queue and they can be consumed and process by the multiple batch processor applications.

Processors can store enhanced data to a database or send to other locations.

Note : If there is a failure in a batch because of an operational error, we can maintain a something like a dead letter queue to send the failed message and later retry with exponential backoff.

Here is a basic example of how we might construct a batch message.

{
  "messageType": "batch/process-batch",
  "data": {
    "id": "0d1bcf5d-88fe-43ef-8cad-dc11ab5da0d3",
    "batchIdentifier": "BATCH_DATA_2025-02-13_12:00:00_2025-02-14_23:59:59",
    "batchNumber": 85,
    "batchType": "BEER",
    "batchSize": 500,
    "startDate": "2025-02-13 12:00:00 +0000 UTC",
    "endDate": "2025-02-14 23:59:59 +0000 UTC",
    "offset": 42000,
    "isLastBatch": false
  }
}

Conclusion

In conclusion, concurrency in Go offers powerful patterns for efficient task execution. Through the Pipeline and Fan-Out patterns, we demonstrated how to process computationally intensive tasks in a structured and scalable manner.

By distributing work across multiple goroutines, we can achieve significant performance gains, making our programs more efficient.

These concurrency techniques are essential for designing high-performance data processing systems, and by mastering them, developers can build scalable and robust Go applications suited for real-world workloads.

0
Subscribe to my newsletter

Read articles from Tharindu Eranga directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Tharindu Eranga
Tharindu Eranga

I love cold beer and design systems 🍻💻