Understanding Fan-Out Concurrency Pattern in Go

Yusuf AkinleyeYusuf Akinleye
7 min read

The Fan-Out Pattern

Welcome back, everyone! Two weeks ago, we started our concurrency series by discussing how concurrency patterns can effectively address business challenges. I illustrated a scenario where the Wait for Results approaches are relevant in our daily lives, such as students taking exams or managing work tasks concurrently without interference. Today, let's elevate our understanding with the Fan-Out Pattern, a powerful strategy for transforming daunting workloads into parallelized successes.

So, what Is the Fan-Out Pattern? Taking it up from where I stopped on the exam invigilation scenario, let’s say the exam is over, and you’ve collected all the answer sheets. Your next task is to grade them. Instead of grading all the papers, you distribute them among teaching assistants (TAs) under your supervision. Each TA grades a subset, slashing your workload and speeding things up.

The Fan-Out pattern involves a single producer distributing tasks to multiple workers, enabling parallel processing for efficiency and scalability

Real-Life Scenario: Grading Papers with Assistants

Here’s how the Fan-Out pattern applies to our exam scenario:

  1. You (the invigilator) are the producer, distributing papers to TAs.

  2. Each TA (worker) processes the papers concurrently.

  3. Once all papers are graded, you collect the results.

How It Works in Go

You can model this scenario in Go using sync.WaitGroup. Instead of just showing the code, I encourage you to try it in the Go Playground. Click the link below to run the code and see the results in real-time:

Go Playground: Fan-Out Example

package main

import (
    "fmt"
    "sync"
    "time"
)

// paper represents an exam paper with a unique ID
type Paper struct {
    ID int
}

func main() {
    // create a buffered channel to hold 10 students' papers
    // which are the tasks that needs to be taken care of.
    papers := make(chan Paper, 10)

    // waitGroup to synchronize goroutines.
    var wg sync.WaitGroup

    // create 3 teaching assistants (TAs) as workers
    // to handle the tasks.
    for i := 0; i < 3; i++ {
        // increment the waitGroup counter
        wg.Add(1)

        // launch the TA worker function
        go gradePapers(i, &wg, papers)
    }

    // send 10 papers to the channel
    for i := 0; i < 10; i++ {
        papers <- Paper{ID: i}
    }

    // close the channel to signal that no more papers will be sent
    close(papers)

    // wait for all TAs to finish grading
    wg.Wait()

    fmt.Println("all papers graded!")
}

// gradePapers represents a teaching assistant (TA) grading papers
func gradePapers(taID int, wg *sync.WaitGroup, papers <-chan Paper) {
    // decrement the counter when the TA finishes
    defer wg.Done()

    // process papers from the channel
    for paper := range papers {
        fmt.Printf("TA %d grading paper %d\n", taID, paper.ID)

        // simulating duration for grading each papers (2 seconds)
        time.Sleep(time.Second * 2)
    }
}

What’s Happening Here?

  1. A buffered channel (papers) is used to send tasks (papers) to workers (TAs).

  2. Each teaching assistant (TA) reads from the channel and processes the papers concurrently.

  3. close(papers) Signals that no more tasks will be sent.

  4. wg.Wait() Ensures the program waits for all TAs to finish grading.

Key Takeaways

  • Use the Fan-Out pattern to distribute tasks across multiple workers.

  • Channels are the backbone of this pattern, enabling safe communication between goroutines.

  • This pattern is ideal for processing large datasets or handling multiple API requests.

Using Wait for Results and Fan-Out Pattern in an API Endpoint

Now that you understand these two patterns, let’s apply them to a simple program. Often, when building scalable and efficient APIs, it becomes necessary to handle multiple tasks concurrently and ensure that all tasks are completed before returning a response. One can leverage this, where the Fan-Out and wait-for-results patterns come into play.

Practical Example: Batch Image Processing

Let’s tie everything together with a real-world example. Imagine you’re building an API endpoint that processes a batch of images. In a batch image processing API, the endpoint takes in image IDs, distributes the processing tasks among multiple goroutines (Fan-Out), and ensures all tasks are completed before returning the results (Wait for Results). This method improves scalability and efficiency in managing API requests.

To illustrate, consider creating an API endpoint for processing a batch of images.

Your goals are to:

  1. Distribute tasks across multiple goroutines to process them in parallel. This is useful for efficiently handling large datasets or numerous requests (Fan-Out).

  2. Ensure all tasks are completed by using synchronization techniques like sync.WaitGroup before returning the results (Wait for Results).

Here’s how you can implement this in Go:

package main

import (
    "fmt"
    "net/http"
    "sync"
)

// processImage simulates image processing and returns a result message.
func processImage(imgID int) string {
    return fmt.Sprintf("image %d processed", imgID)
}

// fanOut starts a goroutine for each image processing task
// and sends results to a channel.
func fanOut(images []int, results chan<- string, wg *sync.WaitGroup) {
    for _, imgID := range images {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            results <- processImage(id)
        }(imgID) // pass imgID explicitly to avoid closure issues
    }
}

// waitForResults waits for all goroutines to finish
// and then closes the results channel.
func waitForResults(wg *sync.WaitGroup, results chan string) {
    go func() {
        wg.Wait()
        close(results)
    }()
}

// batchHandler handles HTTP requests to process a batch of images concurrently.
func batchTaskHandler(w http.ResponseWriter, r *http.Request) {
    // let's assume these image ID are extracted from the request
    images := []int{1, 2, 3, 4, 5}

    // create a buffered channel to store processing results
    results := make(chan string, len(images))

    // waitGroup to synchronize goroutines
    var wg sync.WaitGroup

    // launch image processing goroutines
    fanOut(images, results, &wg)

    // wait for all processing to complete
    waitForResults(&wg, results)

    // collect processed results
    var processed []string
    for result := range results {
        processed = append(processed, result)
    }

    // send response with the processed results
    fmt.Fprintf(w, "results: %v", processed)
}

// main sets up the HTTP server and registers the batch processing handler.
func main() {
    http.HandleFunc("/process", batchTaskHandler)
    err := http.ListenAndServe(":8080", nil)
    if err != nil && http.ErrServerClosed != err {
        panic(err)
    }
}

How It Works:

  1. The /process endpoint receives a batch of image IDs.

  2. It fans out the processing to multiple goroutines.

  3. A sync.WaitGroup Ensures all tasks are completed before returning the results.

Create and open main.go file in your favorite text editor or IDE (e.g., VS Code, GoLand, or Sublime Text). Copy and paste the following code into the file:

  1. Start the Server: In your terminal, navigate to the project directory and run the following command:

     go run main.go
    

    This will start an HTTP server on the port 8080.

  2. Test the Endpoint: Open your browser or use a tool like curl or Postman to send a request to the server:

  3. Expected Output: You should see a response like this. The arrangement will always change whenever a request is made or you refresh it.

     results: [image 1 processed image 2 processed image 3 processed image 4 processed image 5 processed]
    

Understanding the Code

Let’s break down what’s happening in the code:

  1. processImage Function:

    • Simulates image processing by returning a formatted string.

    • This is where you would add actual image processing logic in a real-world application.

  2. fanOut Function:

    • Distributes the image processing tasks across multiple goroutines.

    • Uses a sync.WaitGroup to track the number of active goroutines.

  3. waitForResults Function:

    • Waits for all goroutines to finish using wg.Wait().

    • Closes the results channel to signal that no more results will be sent.

  4. batchTaskHandler Function:

    • Handles HTTP requests to process a batch of images.

    • It uses the Fan-Out pattern to distribute tasks and the Results pattern to collect results.

  5. main Function:

    • Sets up an HTTP server and registers the batchTaskHandler for the /process endpoint.

Conclusion

The Wait for Results and Fan-Out patterns are essential tools for building concurrent systems in Go. Whether invigilating an exam, grading papers, or processing API requests, these patterns help you manage tasks efficiently and scalably.

Combining these patterns allows you to create systems that easily handle high workloads. So the next time you’re juggling multiple tasks, remember: Go’s concurrency primitives have your back!

P.S. If you enjoyed this article, stay tuned for the next part of the series, where we’ll explore more advanced concurrency patterns like Bounded Fan-Out and Cancellation. Happy coding, and may your goroutines always run in harmony! 🚀

3
Subscribe to my newsletter

Read articles from Yusuf Akinleye directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Yusuf Akinleye
Yusuf Akinleye

Software Engineer and Technical writer