Comprehensive Course on Go Concurrency: Goroutines and Channels
Welcome to the comprehensive course on Goroutines and Channels in Go! This course is designed to take you from the basics to advanced concepts, ensuring you become an expert in Go's concurrency model. By the end of this course, you'll understand how to effectively utilize goroutines and channels to build efficient, concurrent applications in Go.
Table of Contents
-
11.1 Context Package
11.2 Pipelines
11.3 Fan-In/Fan-Out
11.4 Worker Pools
1. Introduction to Concurrency in Go
What is Concurrency?
Concurrency refers to the ability of a system to handle multiple tasks simultaneously. In programming, this means managing multiple processes or threads that can execute independently while potentially interacting with each other.
Go's Approach to Concurrency
Go provides built-in support for concurrency, making it easier to write programs that can perform multiple tasks simultaneously. Its concurrency model is based on goroutines and channels, which simplify the process of managing concurrent tasks compared to traditional threading models.
Benefits of Concurrency
Performance: Utilize multiple CPU cores to perform tasks in parallel.
Responsiveness: Improve the responsiveness of applications by performing tasks concurrently.
Resource Efficiency: Lightweight goroutines consume fewer resources compared to traditional threads.
Concurrency vs. Parallelism
Concurrency: Managing multiple tasks at the same time, potentially interleaving their execution.
Parallelism: Executing multiple tasks simultaneously on multiple processors or cores.
Go's concurrency model primarily focuses on concurrency, but it can leverage parallelism on multi-core systems.
2. Goroutines: The Building Blocks of Concurrency
What are Goroutines?
A goroutine is a lightweight thread managed by the Go runtime. Goroutines are functions or methods that run concurrently with other goroutines.
Starting a Goroutine
You can start a goroutine by using the go
keyword followed by a function call.
package main
import (
"fmt"
"time"
)
func sayHello() {
fmt.Println("Hello from goroutine!")
}
func main() {
go sayHello() // Starts a new goroutine
time.Sleep(1 * time.Second) // Wait to allow goroutine to finish
fmt.Println("Hello from main!")
}
Output:
Hello from goroutine!
Hello from main!
Key Characteristics of Goroutines
Lightweight: Goroutines have a small initial stack (around 2KB) that grows and shrinks dynamically, allowing thousands of goroutines to run concurrently.
Managed by Go Runtime: The Go scheduler multiplexes goroutines onto OS threads, abstracting away the complexity of thread management.
Concurrent Execution: Goroutines execute independently and can communicate via channels.
Example: Goroutines with Anonymous Functions
Goroutines can also be started with anonymous functions.
package main
import (
"fmt"
"time"
)
func main() {
go func() {
fmt.Println("Hello from anonymous goroutine!")
}()
time.Sleep(1 * time.Second)
fmt.Println("Hello from main!")
}
Output:
Hello from anonymous goroutine!
Hello from main!
Synchronizing Goroutines
In the above examples, time.Sleep
is used to wait for the goroutine to finish. However, this is not a reliable method for synchronization. We'll explore better synchronization techniques, such as WaitGroups, later in this course.
3. Channels: Communication Between Goroutines
What are Channels?
Channels are Go's way of allowing goroutines to communicate with each other and synchronize their execution. They provide a safe way to send and receive data between goroutines, ensuring that data is exchanged without race conditions.
Creating a Channel
Use the make
function to create a channel.
ch := make(chan int)
This creates a channel of type int
.
Sending and Receiving on Channels
Send: Use the
<-
operator to send data into a channel.Receive: Use the
<-
operator to receive data from a channel.
package main
import (
"fmt"
)
func main() {
ch := make(chan string)
// Sender goroutine
go func() {
ch <- "Hello from channel!"
}()
// Receiver
msg := <-ch
fmt.Println(msg)
}
Output:
Hello from channel!
Unidirectional Channels
Channels can be bidirectional (default) or unidirectional (send-only or receive-only). Unidirectional channels are useful for enforcing communication directionality in your code.
Send-Only:
chan<- Type
Receive-Only:
<-chan Type
func send(ch chan<- string, msg string) {
ch <- msg
}
func receive(ch <-chan string) string {
return <-ch
}
Buffered vs. Unbuffered Channels
Unbuffered Channels: No capacity to hold values; send and receive operations block until the other side is ready.
Buffered Channels: Have a capacity to hold a certain number of values; send operations block only when the buffer is full, and receive operations block only when the buffer is empty.
We'll discuss this in detail in the next section.
4. Channel Operations
Sending Data to a Channel
Use the <-
operator to send data.
ch <- value
Example:
ch := make(chan int)
go func() {
ch <- 42
}()
fmt.Println(<-ch) // Outputs: 42
Receiving Data from a Channel
Use the <-
operator to receive data.
value := <-ch
Example:
ch := make(chan int)
go func() {
ch <- 42
}()
value := <-ch
fmt.Println(value) // Outputs: 42
Closing a Channel
Use the close
function to close a channel. This indicates that no more values will be sent on the channel.
close(ch)
Note: Only the sender should close a channel, never the receiver.
Checking if a Channel is Closed
When receiving from a closed channel, the operation returns the zero value of the channel's type and a boolean indicating if the channel is closed.
value, ok := <-ch
if !ok {
fmt.Println("Channel closed!")
}
Example:
ch := make(chan int)
close(ch)
value, ok := <-ch
fmt.Println(value, ok) // Outputs: 0 false
Range over Channels
You can use a for range
loop to receive values from a channel until it is closed.
ch := make(chan int)
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
}()
for val := range ch {
fmt.Println(val)
}
Output:
0
1
2
3
4
Select Statement
The select
statement allows a goroutine to wait on multiple communication operations.
select {
case msg1 := <-ch1:
fmt.Println("Received", msg1)
case ch2 <- msg2:
fmt.Println("Sent", msg2)
default:
fmt.Println("No communication")
}
We'll explore select
in detail in the later sections.
5. Buffered vs. Unbuffered Channels
Unbuffered Channels
Unbuffered channels have no capacity to store values. Both sender and receiver must be ready to perform the communication.
Characteristics:
Send operations block until a receiver is ready.
Receive operations block until a sender sends a value.
Use Cases:
Synchronization between goroutines.
Ensuring ordering of operations.
Example:
package main
import (
"fmt"
)
func main() {
ch := make(chan int) // Unbuffered channel
go func() {
ch <- 10 // Blocks until receiver is ready
fmt.Println("Sent 10")
}()
value := <-ch // Blocks until value is sent
fmt.Println("Received", value)
}
Output:
Received 10
Sent 10
Buffered Channels
Buffered channels have a capacity to store a specified number of values without blocking.
Characteristics:
Send operations block only when the buffer is full.
Receive operations block only when the buffer is empty.
Allows decoupling between sender and receiver to some extent.
Syntax:
ch := make(chan Type, capacity)
Example:
package main
import (
"fmt"
)
func main() {
ch := make(chan int, 2) // Buffered channel with capacity 2
ch <- 1
ch <- 2
fmt.Println("Sent two values")
// The third send would block since the buffer is full
// go func() { ch <- 3 }()
fmt.Println(<-ch)
fmt.Println(<-ch)
}
Output:
Sent two values
1
2
When to Use Buffered Channels:
When you need to allow some decoupling between sender and receiver.
To improve performance by reducing the number of blocking operations.
When implementing producer-consumer patterns where producers can produce ahead of consumers.
Caveats:
Overuse of buffered channels can lead to increased memory usage.
Managing buffer sizes requires careful consideration to prevent deadlocks or memory leaks.
6. Channel Directionality
Understanding Directionality
Channels in Go can be bidirectional or unidirectional. Specifying directionality helps in enforcing communication patterns and improving code clarity.
Bidirectional Channel: Can both send and receive.
Send-Only Channel: Can only send data.
Receive-Only Channel: Can only receive data.
Declaring Channel Direction
When passing channels to functions, you can specify their direction.
Send-Only:
func sendData(ch chan<- int, value int) {
ch <- value
}
Receive-Only:
func receiveData(ch <-chan int) int {
return <-ch
}
Bidirectional (Default):
func process(ch chan int) {
ch <- 1
value := <-ch
}
Example: Using Unidirectional Channels
package main
import (
"fmt"
)
func sendData(ch chan<- string, msg string) {
ch <- msg
}
func receiveData(ch <-chan string) {
msg := <-ch
fmt.Println("Received:", msg)
}
func main() {
ch := make(chan string)
go sendData(ch, "Hello, Go!")
receiveData(ch)
}
Output:
Received: Hello, Go!
Benefits of Using Unidirectional Channels
Safety: Prevents accidental misuse by enforcing communication direction.
Clarity: Makes the code easier to understand by specifying intent.
Encapsulation: Helps in designing better APIs by controlling how channels are used.
7. The select
Statement
What is the select
Statement?
The select
statement lets a goroutine wait on multiple communication operations (channel sends or receives). It's similar to the switch
statement but specifically designed for channels.
Basic Syntax
select {
case <-ch1:
// Handle ch1 receive
case ch2 <- value:
// Handle ch2 send
default:
// Handle default case if no other case is ready
}
How select
Works
select
blocks until at least one of its cases can proceed.If multiple cases are ready, one is chosen at random.
The
default
case executes immediately if no other case is ready, preventing blocking.
Example: Using select
for Multiple Channels
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
// Goroutine sending to ch1 after 1 second
go func() {
time.Sleep(1 * time.Second)
ch1 <- "Message from ch1"
}()
// Goroutine sending to ch2 after 2 seconds
go func() {
time.Sleep(2 * time.Second)
ch2 <- "Message from ch2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
}
}
Output:
Message from ch1
Message from ch2
Using select
with default
Case
The default
case allows select
to proceed without blocking if no other case is ready.
select {
case msg := <-ch:
fmt.Println("Received:", msg)
default:
fmt.Println("No message received")
}
Example:
package main
import (
"fmt"
)
func main() {
ch := make(chan int)
select {
case msg := <-ch:
fmt.Println("Received:", msg)
default:
fmt.Println("No message received")
}
}
Output:
No message received
Using select
with time.After
You can use select
with time.After
to implement timeouts.
select {
case msg := <-ch:
fmt.Println("Received:", msg)
case <-time.After(2 * time.Second):
fmt.Println("Timeout!")
}
Example:
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
time.Sleep(3 * time.Second)
ch <- "Delayed message"
}()
select {
case msg := <-ch:
fmt.Println("Received:", msg)
case <-time.After(2 * time.Second):
fmt.Println("Timeout!")
}
}
Output:
Timeout!
Select with Multiple Cases
select {
case msg1 := <-ch1:
fmt.Println("Received from ch1:", msg1)
case msg2 := <-ch2:
fmt.Println("Received from ch2:", msg2)
case ch3 <- msg3:
fmt.Println("Sent to ch3:", msg3)
default:
fmt.Println("No communication")
}
Note: Use select
judiciously to avoid complexity and ensure readability.
8. Synchronization with WaitGroups
What are WaitGroups?
WaitGroups are used to wait for a collection of goroutines to finish executing. They provide a way to synchronize concurrent operations, ensuring that the main function waits until all goroutines have completed.
Using sync.WaitGroup
The sync.WaitGroup
type from the sync
package provides the necessary functionality.
Basic Operations
Add(delta int): Adds the number of goroutines to wait for.
Done(): Decrements the WaitGroup counter by one.
Wait(): Blocks until the WaitGroup counter is zero.
Example: Waiting for Goroutines to Finish
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("All workers done")
}
Output:
Worker 1 starting
Worker 2 starting
Worker 3 starting
Worker 1 done
Worker 2 done
Worker 3 done
All workers done
Common Patterns
Adding to WaitGroup Before Starting Goroutines
Ensure you call
wg.Add(1)
before starting the goroutine to prevent race conditions.wg.Add(1) go func() { defer wg.Done() // Do work }()
Deferring
wg.Done()
Use
defer wg.Done()
at the beginning of the goroutine to ensure it is called even if the goroutine panics.Waiting in the Main Goroutine
The main goroutine should call
wg.Wait()
to block until all other goroutines have calledwg.Done()
.
Example: Concurrent Downloads with WaitGroup
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func downloadFile(file string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Downloading %s...\n", file)
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
fmt.Printf("Finished downloading %s\n", file)
}
func main() {
rand.Seed(time.Now().UnixNano())
files := []string{"file1.txt", "file2.jpg", "file3.pdf"}
var wg sync.WaitGroup
for _, file := range files {
wg.Add(1)
go downloadFile(file, &wg)
}
wg.Wait()
fmt.Println("All downloads completed.")
}
Sample Output:
Downloading file1.txt...
Downloading file2.jpg...
Downloading file3.pdf...
Finished downloading file2.jpg
Finished downloading file1.txt
Finished downloading file3.pdf
All downloads completed.
9. Avoiding Common Concurrency Pitfalls
Concurrency introduces complexity, and it's easy to introduce bugs if not handled carefully. Here are some common pitfalls and how to avoid them.
1. Race Conditions
Race conditions occur when multiple goroutines access shared data concurrently without proper synchronization.
Solution:
Use channels to communicate and synchronize access.
Use mutexes (
sync.Mutex
) to protect shared data.Use atomic operations (
sync/atomic
) for simple cases.
Example of Race Condition:
package main
import (
"fmt"
"sync"
)
func main() {
var counter int
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
counter++
wg.Done()
}()
}
wg.Wait()
fmt.Println("Counter:", counter) // May not be 1000
}
Solution with Mutex:
package main
import (
"fmt"
"sync"
)
func main() {
var counter int
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
mu.Lock()
counter++
mu.Unlock()
wg.Done()
}()
}
wg.Wait()
fmt.Println("Counter:", counter) // Always 1000
}
2. Deadlocks
A deadlock occurs when goroutines are waiting indefinitely for each other, preventing progress.
Common Causes:
Improper use of channels (e.g., both sender and receiver waiting).
Not closing channels when expected.
Circular dependencies.
Example of Deadlock:
package main
func main() {
ch := make(chan int)
ch <- 1 // Blocks forever since no receiver
}
Solution:
Ensure that for every send, there is a corresponding receive, and vice versa.
package main
import "fmt"
func main() {
ch := make(chan int)
go func() {
ch <- 1
}()
fmt.Println(<-ch)
}
3. Goroutine Leaks
A goroutine leak happens when goroutines are left running without a way to terminate, often due to blocking operations.
Solution:
Use
select
withdone
channels or context cancellation to signal goroutines to exit.Ensure all goroutines have a clear exit path.
Example of Goroutine Leak:
package main
func main() {
ch := make(chan int)
go func() {
for {
<-ch // Never exits
}
}()
}
Solution with done
Channel:
package main
import (
"fmt"
)
func worker(ch <-chan int, done <-chan bool) {
for {
select {
case val := <-ch:
fmt.Println("Received:", val)
case <-done:
fmt.Println("Exiting goroutine")
return
}
}
}
func main() {
ch := make(chan int)
done := make(chan bool)
go worker(ch, done)
ch <- 1
ch <- 2
done <- true
}
Output:
Received: 1
Received: 2
Exiting goroutine
4. Improper Channel Closure
Closing a channel incorrectly can lead to panics or unexpected behavior.
Rules for Closing Channels:
Only the sender should close a channel.
Never close a channel from the receiver side.
Do not close a channel multiple times.
Receivers should handle closed channels gracefully.
Example of Improper Closure:
package main
func main() {
ch := make(chan int)
close(ch)
close(ch) // Panic: close of closed channel
}
Proper Closure:
package main
import "fmt"
func main() {
ch := make(chan int)
go func() {
ch <- 1
ch <- 2
close(ch)
}()
for val := range ch {
fmt.Println(val)
}
}
Output:
1
2
10. Concurrency Patterns and Best Practices
1. Worker Pool Pattern
A worker pool limits the number of concurrent goroutines performing a task, improving resource management and preventing overload.
Example:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(time.Second)
}
}
func main() {
const numWorkers = 3
jobs := make(chan int, 5)
var wg sync.WaitGroup
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, &wg)
}
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
fmt.Println("All jobs processed.")
}
Output:
Worker 1 processing job 1
Worker 2 processing job 2
Worker 3 processing job 3
Worker 1 processing job 4
Worker 2 processing job 5
All jobs processed.
2. Pipeline Pattern
A pipeline allows data to flow through a series of stages, each processing the data and passing it to the next stage.
Example:
package main
import (
"fmt"
)
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
nums := generator(2, 3, 4, 5)
squares := square(nums)
for sq := range squares {
fmt.Println(sq)
}
}
Output:
4
9
16
25
3. Fan-In and Fan-Out
Fan-Out: Distributing work across multiple goroutines.
Fan-In: Merging multiple channels into one.
Example:
package main
import (
"fmt"
"sync"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 5)
results := make(chan int, 5)
var wg sync.WaitGroup
// Start 3 workers
for w := 1; w <= 3; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// Send jobs
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// Wait for workers to finish
go func() {
wg.Wait()
close(results)
}()
// Collect results
for res := range results {
fmt.Println("Result:", res)
}
}
Output:
Worker 1 processing job 1
Worker 2 processing job 2
Worker 3 processing job 3
Worker 1 processing job 4
Worker 2 processing job 5
Result: 2
Result: 4
Result: 6
Result: 8
Result: 10
4. Rate Limiting
Control the rate at which events are processed using channels and time.Ticker
.
Example:
package main
import (
"fmt"
"time"
)
func main() {
jobs := make(chan int)
done := make(chan bool)
// Rate limiter: one job per second
limiter := time.Tick(1 * time.Second)
go func() {
for j := range jobs {
<-limiter
fmt.Println("Processing job", j)
}
done <- true
}()
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
<-done
}
Output:
Processing job 1
Processing job 2
Processing job 3
Processing job 4
Processing job 5
Note: Each job is processed at a rate of one per second.
5. Fan-Out/Fan-In with Multiple Stages
Example:
package main
import (
"fmt"
"sync"
)
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func cube(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n * n
}
close(out)
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs.
// Each goroutine copies values from its input channel to out until the channel is closed.
for _, c := range cs {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}(c)
}
// Start a goroutine to close out once all the output goroutines are done.
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
nums := generator(2, 3, 4)
squares := square(nums)
cubes := cube(nums)
for result := range merge(squares, cubes) {
fmt.Println(result)
}
}
Output:
4
8
9
27
16
64
11. Advanced Topics
11.1. Context Package
The context
package in Go provides a way to carry deadlines, cancellation signals, and other request-scoped values across API boundaries and between processes.
Use Cases:
Timeouts and cancellations for goroutines.
Passing request-scoped data.
Managing goroutine lifecycles.
Basic Usage:
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Worker stopping:", ctx.Err())
return
default:
fmt.Println("Worker working...")
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go worker(ctx)
time.Sleep(2 * time.Second)
cancel()
time.Sleep(1 * time.Second)
fmt.Println("Main done")
}
Output:
Worker working...
Worker working...
Worker working...
Worker working...
Worker stopping: context canceled
Main done
Common Functions:
context.Background()
: Returns an empty context.context.TODO()
: Used when unsure which context to use.context.WithCancel(parent)
: Returns a copy of the parent context with a cancellation function.context.WithTimeout(parent, timeout)
: Returns a copy of the parent context with a deadline.context.WithDeadline(parent, deadline)
: Returns a copy of the parent context with a specific deadline.
Example with Timeout:
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Worker stopping:", ctx.Err())
return
default:
fmt.Println("Worker working...")
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go worker(ctx)
<-ctx.Done()
fmt.Println("Main done:", ctx.Err())
}
Output:
Worker working...
Worker working...
Worker working...
Worker stopping: context deadline exceeded
Main done: context deadline exceeded
11.2. Pipelines
A pipeline is a series of connected stages, each processing data and passing it to the next stage. Pipelines are useful for processing streams of data efficiently.
Example:
package main
import (
"fmt"
"strings"
)
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func toString(in <-chan int) <-chan string {
out := make(chan string)
go func() {
for n := range in {
out <- fmt.Sprintf("Number: %d", n)
}
close(out)
}()
return out
}
func main() {
nums := generator(2, 3, 4)
squares := square(nums)
stringsChan := toString(squares)
for s := range stringsChan {
fmt.Println(s)
}
}
Output:
Number: 4
Number: 9
Number: 16
11.3. Fan-In/Fan-Out
Fan-Out: Distributing work across multiple goroutines.
Fan-In: Merging results from multiple goroutines into a single channel.
Example:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(time.Millisecond * 500)
results <- job * 2
}
}
func fanIn(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
jobs := make(chan int, 5)
results := make(chan int, 5)
var wg sync.WaitGroup
// Start 2 workers
for w := 1; w <= 2; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// Send jobs
for j := 1; j <= 4; j++ {
jobs <- j
}
close(jobs)
// Collect results
go func() {
wg.Wait()
close(results)
}()
for res := range results {
fmt.Println("Result:", res)
}
}
Output:
Worker 1 processing job 1
Worker 2 processing job 2
Worker 1 processing job 3
Worker 2 processing job 4
Result: 2
Result: 4
Result: 6
Result: 8
11.4. Worker Pools
A worker pool is a collection of goroutines that process tasks from a shared channel. It limits the number of concurrent tasks, improving resource management.
Example:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(time.Millisecond * 300)
results <- job * 2
}
}
func main() {
const numWorkers = 3
jobs := make(chan int, 5)
results := make(chan int, 5)
var wg sync.WaitGroup
// Start worker pool
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// Send jobs
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// Collect results
go func() {
wg.Wait()
close(results)
}()
for res := range results {
fmt.Println("Result:", res)
}
}
Output:
Worker 1 processing job 1
Worker 2 processing job 2
Worker 3 processing job 3
Worker 1 processing job 4
Worker 2 processing job 5
Result: 2
Result: 4
Result: 6
Result: 8
Result: 10
12. Practical Examples and Projects
Example 1: Concurrent Web Scraper
A web scraper that fetches URLs concurrently using goroutines and channels.
Requirements:
Fetch multiple URLs concurrently.
Limit the number of concurrent fetches.
Collect and display the results.
Implementation:
package main
import (
"fmt"
"io/ioutil"
"net/http"
"sync"
)
func fetchURL(url string, wg *sync.WaitGroup, ch chan<- string) {
defer wg.Done()
resp, err := http.Get(url)
if err != nil {
ch <- fmt.Sprintf("Error fetching %s: %v", url, err)
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
ch <- fmt.Sprintf("Error reading %s: %v", url, err)
return
}
ch <- fmt.Sprintf("Fetched %s: %d bytes", url, len(body))
}
func main() {
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://www.golang.org",
"https://www.stackoverflow.com",
}
var wg sync.WaitGroup
ch := make(chan string, len(urls))
for _, url := range urls {
wg.Add(1)
go fetchURL(url, &wg, ch)
}
wg.Wait()
close(ch)
for msg := range ch {
fmt.Println(msg)
}
}
Sample Output:
Fetched https://www.google.com: 12500 bytes
Fetched https://www.github.com: 10500 bytes
Fetched https://www.golang.org: 9500 bytes
Fetched https://www.stackoverflow.com: 11500 bytes
Example 2: Real-Time Chat Application
A simple real-time chat application using goroutines and channels.
Features:
Multiple clients can send and receive messages.
Messages are broadcasted to all connected clients.
Implementation:
Due to space constraints, a simplified version is presented.
package main
import (
"bufio"
"fmt"
"net"
"sync"
)
var (
clients = make(map[net.Conn]bool)
broadcast = make(chan string)
mu sync.Mutex
)
func handleConnection(conn net.Conn) {
defer conn.Close()
mu.Lock()
clients[conn] = true
mu.Unlock()
reader := bufio.NewReader(conn)
for {
msg, err := reader.ReadString('\n')
if err != nil {
break
}
broadcast <- msg
}
mu.Lock()
delete(clients, conn)
mu.Unlock()
}
func broadcaster() {
for {
msg := <-broadcast
mu.Lock()
for conn := range clients {
fmt.Fprint(conn, msg)
}
mu.Unlock()
}
}
func main() {
listener, err := net.Listen("tcp", ":9000")
if err != nil {
fmt.Println("Error starting server:", err)
return
}
defer listener.Close()
fmt.Println("Chat server started on :9000")
go broadcaster()
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println("Error accepting connection:", err)
continue
}
go handleConnection(conn)
}
}
Usage:
Run the server.
Connect multiple clients using
telnet
ornc
:telnet localhost 9000
Type messages in one client and see them broadcasted to all connected clients.
Example 3: Parallel Data Processing with Pipelines
A pipeline that processes data in stages: generation, processing, and consumption.
Implementation:
package main
import (
"fmt"
"math/rand"
"time"
)
func generator(count int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i < count; i++ {
out <- rand.Intn(100)
time.Sleep(time.Millisecond * 100)
}
close(out)
}()
return out
}
func processor(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for num := range in {
out <- num * num
time.Sleep(time.Millisecond * 200)
}
close(out)
}()
return out
}
func consumer(in <-chan int, done chan<- bool) {
for num := range in {
fmt.Println("Processed number:", num)
}
done <- true
}
func main() {
rand.Seed(time.Now().UnixNano())
done := make(chan bool)
nums := generator(5)
squares := processor(nums)
go consumer(squares, done)
<-done
fmt.Println("All numbers processed.")
}
Sample Output:
Processed number: 1024
Processed number: 576
Processed number: 841
Processed number: 144
Processed number: 900
All numbers processed.
13. Conclusion and Next Steps
Congratulations! You've completed the comprehensive course on Goroutines and Channels in Go. Here's a summary of what you've learned:
Goroutines: Lightweight threads for concurrent execution.
Channels: Safe communication between goroutines.
Channel Operations: Sending, receiving, closing, and ranging over channels.
Buffered vs. Unbuffered Channels: Understanding different channel capacities.
Channel Directionality: Enforcing communication patterns.
Select Statement: Handling multiple channel operations.
WaitGroups: Synchronizing goroutines.
Concurrency Pitfalls: Avoiding race conditions, deadlocks, and goroutine leaks.
Concurrency Patterns: Implementing worker pools, pipelines, fan-in/fan-out, and more.
Advanced Topics: Using the
context
package for cancellation and timeouts.
Next Steps
To further solidify your understanding and expertise in Go concurrency:
Build Real-World Projects: Apply the concepts by building applications such as web servers, concurrent data processors, or real-time systems.
Explore Go's Standard Library: Many packages in Go's standard library make extensive use of concurrency. Studying these can provide deeper insights.
Performance Optimization: Learn how to profile and optimize concurrent Go programs for better performance.
Advanced Synchronization: Dive deeper into synchronization primitives like mutexes, condition variables, and atomic operations.
Understand Go's Scheduler: Learn how Go's runtime scheduler manages goroutines and threads for better optimization.
Stay Updated: Go continues to evolve. Keep up with the latest features and best practices by following official Go blogs, forums, and community discussions.
Recommended Resources
Books:
The Go Programming Language by Alan A. A. Donovan & Brian W. Kernighan
Concurrency in Go by Katherine Cox-Buday
Online Tutorials:
Official Documentation:
Courses:
Thank you for embarking on this journey to master Go's concurrency model! With practice and exploration, you'll be well-equipped to build efficient, concurrent applications in Go.
Happy Coding!
Subscribe to my newsletter
Read articles from Sundaram Kumar Jha directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Sundaram Kumar Jha
Sundaram Kumar Jha
I Like Building Cloud Native Stuff , the microservices, backends, distributed systemsand cloud native tools using Golang