Understanding Worker Pools in Go

Table of contents

Introduction
Golang has made it very simple to apply concurrency in our programs and worker pool is one of the amazing concurrency pattern that we can simply create with go.
Definition
The technical definition is: (Yes, I copied this from internet 🙂)
A worker pool is a software design pattern where a set of worker threads or processes (the "pool") are created to concurrently execute tasks from a queue
In simple terms of golang, worker pool allows us to run multiple tasks(jobs) concurrently using a fixed number of goroutines(workers) that pulls tasks from a shared channel(acting as queue) without need of creating a new goroutine for each task. Thus efficiently running tasks with less resource usage.
Why Worker Pools?
Worker pool makes it possible to run a huge number of tasks on a limited hardware resource.
For e.g. In case of hitting GET requests concurrently to 100 urls, we can directly run a goroutine for each GET call. It’s simple in golang.
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
type Result struct {
Response *http.Response
Error error
}
func request(client *http.Client, url string, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
res, err := client.Get(url)
results <- Result{res, err}
}
func main() {
urls := []string{
// 100 urls from https://gist.github.com/demersdesigns/4442cd84c1cc6c5ccda9b19eac1ba52b
}
results := make(chan Result, len(urls))
wg := sync.WaitGroup{}
client := http.Client{
Timeout: 5 * time.Second,
}
wg.Add(len(urls))
// starting goroutine for each url
for _, v := range urls {
go request(&client, v, results, &wg)
}
// collecting and printing results
for i := 0; i < len(urls); i++ {
res := <-results
if res.Error != nil {
fmt.Printf("error in request: %s\n", res.Error.Error())
} else {
fmt.Printf("request success for %s\n", res.Response.Request.URL)
}
}
wg.Wait()
}
Here, we have created a urls
list that holds 100 url string. Then we make a channel of Result
struct that holds the response and error. Then we create a WaitGroup wg
to wait for completion of goroutines and an http client with 5 seconds timeout. then we started 100 goroutines using range loop using the request
function.
Inside this function, we first defer wg.Done()
and hit the GET request. Then the Result
is sent to the results
channel.
Simultaneously in main program, we loop for urls
length and get result from results
channel and print them.
This program is fast at first look and also very simple, but if we want to do similar thing on let’s say 1000, 10,000 or even 100,000 URLs? it can easily max out the system resources for a huge dataset at scale.
But it can be improved. This is where, worker pools can help!
Worker Pool
Worker pool allows us to run fixed number of goroutines, where each goroutine can call multiple GET urls from the shared queue channel.
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
type Job struct {
URL string
}
type JobResult struct {
Response *http.Response
Error error
}
func workers(client *http.Client, jobs <-chan Job, results chan<- JobResult, wg *sync.WaitGroup) {
for job := range jobs {
res, err := client.Get(job.URL)
results <- JobResult{res, err}
wg.Done()
}
}
func main() {
urls := []string{
// 100 urls from https://gist.github.com/demersdesigns/4442cd84c1cc6c5ccda9b19eac1ba52b
}
numWorkers := 10
client := http.Client{
Timeout: 5 * time.Second,
}
jobs := make(chan Job, len(urls))
results := make(chan JobResult, len(urls))
wg := sync.WaitGroup{}
// starting a fixed number of worker goroutines
for i := 0; i < numWorkers; i++ {
go workers(&client, jobs, results, &wg)
}
wg.Add(len(urls))
// sending jobs to the job queue
for i := 0; i < len(urls); i++ {
jobs <- Job{urls[i]}
}
close(jobs)
// collecting and printing results
for i := 0; i < len(urls); i++ {
res := <-results
if res.Error != nil {
fmt.Println("error: ", res.Error.Error())
} else {
fmt.Println(res.Response.Request.URL)
}
}
wg.Wait()
}
In this,
we have created a new struct Job
to demonstrate a task/job that holds the url.
We used a buffered channel jobs
of Job
with numWorkers
capacity that specifies total workers we want to deploy. This channel will be our shared queue from which the worker goroutines will pick their task.
Then we deploy exactly numWorkers
number of workers as goroutines using workers
function. Inside this function, we use range over the jobs
channel that picks tasks from this channel. On picking a task, it makes the GET call, puts the Result
in results
channel and completes with wg.Done()
Then, we put counter to our WaitGroup and start putting jobs to our jobs
channel by looping over urls
list. The we close the channel using close(jobs)
to signal the workers that there are no more tasks and they can exit the range loop
After this, we get results using a loop from results
channel and print to the console.
Here, inside the workers
function, we loop over the jobs
channel and pick the available job/task in this channel. As the workers goes on picking a task from this channel, it keeps on reducing and eventually all of the tasks gets picked and completed. This works like when worker1 pick task1, worker2 picks task2. Then worker1 can go to pick task3 and thus each worker can eventually execute multiple tasks.
Conclusion
So, instead of executing 1 task per goroutine, we can execute multiple tasks from a single goroutine. This is the main purpose of using a worker pool!
In the given examples, we are running total of 10 worker goroutines to call 100 GET calls using a worker pool, which is much efficient than previous example that spins 100 goroutines for same data.
If you find this article helpful, don't forget to hit the ❤️ button.
Check out my website here and feel free to connect.
Happy Coding! 👨💻
Subscribe to my newsletter
Read articles from Aniket Yadav directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
