Communicating Sequential Processes in Golang

Bogdan N.Bogdan N.
12 min read

This blog post comes from a tech talk I gave internally at IKEA.
I will introduce some key concepts to build a mental model to with with concurrency and then dive into Go with examples that you can find in this repo.

The goal of this blog post is to refresh concepts for senior engineers and introduce topics of concurrency, parallelism, race conditions, deadlock, mutex and more to anyone who hasn’t had to deal with concurrent programming directly.

Concurrency

We can consider concurrency as parts of the program that execute independently in a non-deterministic order.

Concurrency is NOT Parallelism: depending on the hardware we can run parts of the program at the same time (in parallel) but just because we have concurrency it doesn’t mean we have parallelism.

What does it mean, “non-deterministic order”? It’s different behaviour on different runs but same result. For example processes(*) have a different trace of execution: ​ {1, 2a, 3a, 2b, 3b, 4}
​{1, 2a, 2b, 3a, 3b, 4}​
{1, 3a, 3b, 2a, 2b, 4}​ ​

*process: a series of actions or events to achieve a goal

We can look at a concurrent program with a non-deterministic trace of execution like this:

Concurrency vs Parallelism​

Concurrency: dealing with things happening out of order.

Parallelism: dealing with things happening at the same time.

Parallelism will make your program faster while concurrency doesn’t have to. Concurrency can happen in a single core processor​. Parallelism can only happen in a multi-core processor.

Concurrency can make your program slower, if you are on a single core machine or your logic doesn't need concurrency​. But it can make your program faster when you have to wait for something – by letting other parts of the program execute.

Why do we need concurrency?

  • Enable parallelism

  • Let parts of the program execute independently

  • Remove the need to wait for an event

If we look at our previous concurrent example but this time we have access to parallelism as well, we can see that the program will be much faster:

Another picture to illustrate concurrency, parallelism and everything put together:

Concurrency Problems

The main issue with concurrency comes from its non-deterministic nature that can produce race conditions.

For example:
{1, 2a, 3a, 2b, 3b, 4}​
{1, 2a, 2b, 3a, 3b, 4}​
{1, 3a, 3b, 2a, 2b, 4}​
{1, 3a, 2a, 2b, 3b, 4} <- Race condition

Race Condition

What is it and when does it happen?
- When two or more processes try to alter a shared resource simultaneously​.
- It happens when there's a fault in the timing or order of events​.
- Data Race is caused by a Race Condition

How to avoid Race Conditions?

  • Make the operations atomic for example make READ, MODIFY and STORE one operation​

  • Ensure that the shared resource, if being modified, is used by one process at a time – one atomic operation​

  • Atomicity adds more sequential order to our operations​

Looking at the previous example we can make atomic operations and add a more sequential order that will prevent our program into running into race conditions, the drawback is that by adding deterministic outcome you reduce speed of execution.

Mutex (Lock)

A great way to prevent race conditions is to use a Mutex: you can think of it as a lock that controls who has access on shared resources. We use a mutex for mutual exclusion to control concurrency.
Wikipedia has a great definition: a synchronization primitive that prevents state from being modified or accessed by multiple threads of execution at once. Locks enforce mutual exclusion concurrency control policies

I also like this response from Stack Overflow about using a rubber chicken to allow people talk in a heated discussion while others wait.

Deadlock

A possible issue with concurrency is having a deadlock, which happens when
- Two processes are waiting for each other
- Mutual exclusion is not present: we have no control over using the shared resource

Key Concurrency Concepts

  • Use concurrency to enable parallelism and/or avoid waiting for events

  • Make sure your shared resources don't produce race conditions by using mutual exclusion and atomic operations

Golang

It’s a language designed to use for servers on the cloud. Go concurrency model is based on: Hoare's CSP (Communicating Sequential Processes) in 1978 and even Dijkstra's guarded commands (1975).
Golang's main difference from other similar languages is channels. Erlang is closer to the original CSP, where you communicate to a process by name rather than over a channel.

Github Repo with examples

I will be using examples that are available on my Github and you can also find the links to go dev playground if you don’t want to run the code locally in there.

Goroutines

  • Goroutine is a lightweight "thread" we use for concurrent operations​

  • Use goroutines for async programming: spin a goroutine to do something else while your program continues by using the go keyword before the function

  • You can use anonymous functions with goroutines​

  • Goroutine is a wordplay related to Coroutine​

package main

import (
    "fmt"
    "time"
)

// print the ints in the loop
func printFrom(from string) {
    for i := 0; i < 3; i++ {
        fmt.Println(from, ":", i)
    }
}

func main() {
    printFrom("direct")

    // if you keep launching the program sometimes you will see goroutine2 first
    go printFrom("goroutine1: runs concurrently")
    go printFrom("goroutine2: runs concurrently")

    go func(msg string) {
        fmt.Println(msg)
    }("anonymous goroutine")

    // wait a second for goroutines to complete
    // otherwise the program finishes without displaying results from our goroutines
    time.Sleep(time.Second)
    fmt.Println("done")
}

Channels

  • A channel is like a (UNIX) pipe: data goes in one end and comes out of another in the same order until the channel is closed​

  • Channels act as first-in-first-out queues

  • Channels are bi-directional: you can restrict a channel's direction​

  • You can have multiple writers and readers on the same channel

package main

import "fmt"

func main() {
    fmt.Println("starting the program")
    // make a new channel
    messages := make(chan string)

    // use an anymous  goroutine to send the string "ping" to the channel
    go func() { messages <- "first msg" }()
    go func() { messages <- "second msg" }()

    // we only loop twice becase we know we sent 2 messages
    for i := 0; i < 2; i++ {
        fmt.Println(<-messages)
    }
}

In this example we can see how to send 2 messages concurrently to the same channel and read from the channel using a for loop.

Key concepts about channels:

  • Channel will block until someone reads the data, unless it's buffered​

  • You can have receive only or send only channels​

  • You can pass channels in between functions​

  • You can NOT send data to a closed channel

Deadlock example with channels

package main

import "fmt"

func main() {
    fmt.Println("starting the program")
    // make a new unbuffered channel
    messages := make(chan string)

    // we pass the channel in the function
    // share memory by communicating
    deadlockExample(messages)
    // normalLoop(messages)
}

func deadlockExample(messages chan string) {
    // use an anymous  goroutine to send the string to the channel
    go func() { messages <- "first msg" }()
    go func() { messages <- "second msg" }()
    // Deadlock!
    // this loop is waiting forever on the channel
    // because the channel is never closed
    for msg := range messages {
        // display the data we got
        fmt.Println(msg)
    }
}

func normalLoop(messages chan string) {
    go func() {
        // send the data
        messages <- "first msg"
        messages <- "second msg"
        // close the channel
        close(messages)
    }()

    // now that the channel is closed this for loop won't wait forever
    for msg := range messages {
        // get the data from the channel
        // display the data we got
        fmt.Println(msg)
    }
}

Key Considerations:

  • Range and For loops will wait to get data indefinitely from the channel until they are closed​

  • The sender should CLOSE the channel

Buffered channels

For better performance we can use buffered channels if we know the capacity we will use.

package main

import "fmt"

func main() {
    messages := []string{"first msg", "second msg", "third msg"}

    // this channel only has capacity for 3
    // but we will use concurrency to share memory and send the latest message
    // we can use buffered channels so we never have to wait for the receiver to store the data
    // we store the data in the buffer, if the buffer is full we block
    bufferedChannel := make(chan string, 3)

    messages = addMessageToSlice("last msg", messages)
    // by enabling concurrency by using "go" keyword in front of this function
    // we send the latest message to the channel when the first one is out
    // if we remove the "go" keyword we will have a deadlock!
    go sendMessagesToChannel(messages, bufferedChannel)

    for m := range bufferedChannel {
        fmt.Println(m)
    }
}

func sendMessagesToChannel(messages []string, bufchan chan string) {
    for _, m := range messages {
        bufchan <- m
    }
    // remember to always close the channel
    close(bufchan)
}

func addMessageToSlice(msg string, messages []string) []string {
    messages = append(messages, msg)
    return messages
}

If we remove the go keyword from go sendMessagesToChannel(messages, bufferedChannel) we will have bufchan <- m waiting to send a 4th message but capacity is full and the for loop in main is waiting for the data to start to flow, creating a deadlock!

WaitGroup

WaitGroup is an object from the sync package that will let you control the flow in a more sequential way, check the example below.

package main

import (
    "fmt"
    "sync"
)

func main() {
    messages := []string{"first msg", "second msg", "third msg"}
    moreMessages := []string{"fourth msg", "fifth msg", "last msg"}

    bufferedChannel := make(chan string, 3)

    // we make an object to control the flow of our execution
    // it contains a counter for each executed function
    wg := sync.WaitGroup{}
    for _, m := range moreMessages {
        // every time we spin a new goroutine we increase the counter
        // on how much we have to wait
        wg.Add(1)
        // keep in mind the messages will not arrive in order because go routines spawn concurrently (non-deterministic behaviour)
        go func(m string) {
            messages = addMessageToSlice(m, messages)
            // once the execution is done we let our WaitGroup know (this reduces the counter)
            wg.Done()
        }(m)

    }

    // we make sure we wait for the previous block to finish
    wg.Wait()

    go sendMessagesToChannel(messages, bufferedChannel)

    for m := range bufferedChannel {
        fmt.Println(m)
    }
}

func sendMessagesToChannel(messages []string, bufchan chan string) {
    for _, m := range messages {
        bufchan <- m
    }
    // remember to always close the channel
    close(bufchan)
}

func addMessageToSlice(msg string, messages []string) []string {
    messages = append(messages, msg)
    return messages
}

Don’t use WaitGroup just because you can

Here’s an example of the same code as above but refactored to work as expected without using WaitGroup.

package main

import (
    "fmt"
)

func main() {
    messages := []string{"first msg", "second msg", "third msg"}
    moreMessages := []string{"fourth msg", "fifth msg", "last msg"}

    bufferedChannel := make(chan string, 3)

    // due to the concurrent nature of our program we can just keep sending messages to the channel
    for _, m := range moreMessages {
        messages = addMessageToSlice(m, messages)
    }

    go sendMessagesToChannel(messages, bufferedChannel)

    for m := range bufferedChannel {
        fmt.Println(m)
    }
}

func sendMessagesToChannel(messages []string, bufchan chan string) {
    for _, m := range messages {
        bufchan <- m
    }
    close(bufchan)
}

func addMessageToSlice(msg string, messages []string) []string {
    return append(messages, msg)
}

Channel Key Points

  • Used to communicate by sharing memory​

  • Remember to close the channel (sometimes GC will take care of it)​

  • If you already know your channel's capacity use a buffered channel

Select

  • The select statement lets a goroutine wait on multiple communication operations

  • A select blocks until one of its cases can run, then it executes that case. It chooses one at random if multiple are ready

package main

import (
    "fmt"
    "time"
)

func main() {
    // Create two channels
    channel1 := make(chan string)
    channel2 := make(chan string)

    // msg will arrive after 2 seconds
    go func() {
        time.Sleep(2 * time.Second)
        channel1 <- "Message from Channel 1"
    }()

    // msg will arrive after 1 second
    go func() {
        time.Sleep(1 * time.Second)
        channel2 <- "Message from Channel 2"
    }()

    // we will execute this block twice and check every
    // if we get a msg back
    // then print whichever is ready
    for i := 0; i < 2; i++ {
        // select block waits for one of the channels to receive a msg
        // it waits for at least one channel to be ready
        select {
        case msg1 := <-channel1:
            fmt.Println(msg1)
        case msg2 := <-channel2:
            fmt.Println(msg2)
            // we can add a default case for when we don't want to wait for a msg
            // default:
            //    fmt.Println("default")
        }
    }
}

If you are unsure about the data arriving but want to proceed anyway have a default case in your select

package main

import (
    "fmt"
    "time"
)

func main() {
    channel := make(chan string)

    // wait 2s and send data to the channel
    go func() {
        time.Sleep(2 * time.Second)
        channel <- "Data from Goroutine"
    }()

    // infinite loop
    for {
        select {
        case msg := <-channel:
            fmt.Println(msg)
            return // break infinite loop

        // default case simply simulates some work
        // we call it every time  until we get the other case
        default:
            fmt.Println("No data yet, doing other work...")
            time.Sleep(500 * time.Millisecond) // Simulate some work
        }
    }
}

Mutex

  • Use a mutex to access your shared resources​

  • Think if you really want or need to have a shared state

Mutex ensures only one process is modifying the shared data at a time​.

package main

import (
    "fmt"
    "sync"
)

type Container struct {
    counters map[string]int // shared resource that doesn't support concurrent writes
    mu       sync.Mutex     // protect the shared resource with a lock
}

func (c *Container) inc(name string) {
    // try removing the lock here and see what happens
    // the program will panic with an error
    // fatal error: concurrent map writes
    c.mu.Lock()
    defer c.mu.Unlock()
    c.counters[name]++
}

func (c *Container) dec(name string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.counters[name]--
}

func main() {
    c := Container{
        counters: map[string]int{"a": 0, "b": 0}, // map doesn't support concurrent access
    }

    var wg sync.WaitGroup

    increment := func(name string, n int) {
        for i := 0; i < n; i++ {
            c.inc(name) // increment counter
        }
        wg.Done()
    }

    decrement := func(name string, n int) {
        for i := 0; i < n; i++ {
            c.dec(name) // decrement counter
        }
        wg.Done()
    }

    // if you WaitGroup counter is lower than needed your program will panic
    // if it's higher it will be blocked in a deadlock
    wg.Add(3)
    go decrement("a", 1000)
    go increment("a", 1000)
    go increment("b", 1000)

    wg.Wait()

    fmt.Println("Final Counters:", c.counters)
}

There’s more to concurrency in Go!

In this article you have things to get started and to understand the concurrency model but to master it going forward I’d recommend looking into patterns like Fan-In Fan-Out (Multiplexing) and Pipelines to get started.

Additionally once you start developing services it’s a good practice to rely con context for cancellation and more.

1
Subscribe to my newsletter

Read articles from Bogdan N. directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Bogdan N.
Bogdan N.

Full Stack Software Engineer, trying to improve one line of code at a time.