Multi Threading in odin lang

Dhadve YashDhadve Yash
5 min read

In this blog we will take a look at how to use threads in odin lang. Threads are pretty important and advanced topic so understanding them initially can be hard. Odin handles threads similar to how golang handles it.

Creating threads

package main

import "core:fmt"
import "core:thread"
import "core:sync"

myMutex :sync.Mutex

// A structure for the data that we are gonna pass in our multithreaded function
workerData :: struct { 
    // waitGroup is passed so that when our process is done,
    // we can tell our main thread via this
    waitgroupdata: ^sync.Wait_Group
}

main :: proc() {
    // initilizing a wait group which will keep track of our threads
    // it is used to check if our thread has completed its work or not
    wg : sync.Wait_Group

    // creating a new thread
    t1 := thread.create(worker)
    t1.init_context = context
    t1.user_index = 1
    t1.data = &workerData{ waitgroupdata = &wg}

    t2 := thread.create(worker)
    t2.init_context = context
    t2.user_index = 1
    t2.data = &workerData{ waitgroupdata = &wg}

    // this function takes in counter of how many threads are going to start
    // In our case we are just spinning up 2 thread
    sync.wait_group_add(&wg, 2)
    // starting our thread
    thread.start(t1)
    thread.start(t2)
    /*
     waiting for our threads to complete their work
     this function checks if our counter value is greater than 0
     if counter is greater than 0 then that means our threads are still working
     and when it becomes 0 then all threads are done with there work and we can down

     Internally it looks something like this
     for wg.counter != 0 { }
    */
    sync.wait_group_wait(&wg)
}

worker :: proc(t: ^thread.Thread) {
    fmt.printf("work of thread  %d started \n", t.user_index)
    // getting values that we passed while initilizing our thread
    dereferenced_value := (cast(^workerData)t.data)
    fmt.println("work of thread %d done", t.user_index)
    // this function just does counter--
    // which tells that our function has done its work
    sync.wait_group_done(dereferenced_value.waitgroupdata)
}

Output -

work of thread 1 started 
work of thread 2 started
work of thread 2 done
work of thread 1 done

Now lets try creating many threads

package main

import "core:fmt"
import "core:thread"
import "core:time"

arr := []int{1,2,3}
main :: proc() {
    // creating a dynamic array of len of array which stores pointer to a thread
    threadPool := make([dynamic]^thread.Thread, 0, len(arr))
    defer delete(threadPool)

    for i in 0..<len(arr) {
        thr := thread.create(worker) // creating a thread
        if thr != nil {
            // setting up context for our thread
            thr.init_context = context
            // giving our thread id
            thr.user_index = i 
            // Adding our thread to thread pool
            append(&threadPool, thr) 

            thread.start(thr) // running our thread
        }
    }

    // looping till all of our threads have done their work
    // i.e till our threadPool is empty
    // our threadPool becomes empty because after each 
    for len(threadPool) > 0 {
        for i := 0; i < len(threadPool); {
            // Getting a threads address at index i
            t := threadPool[i]
            if thread.is_done(t) {
                fmt.printf("Thread %d is done \n", arr[t.user_index])
                thread.destroy(t)
                // removing address of destroyed thread from out pool
                ordered_remove(&threadPool, i)
            } else {
                // If current thread process is not done then go to next one
                i += 1 
            }
        }
    }

}

worker :: proc(t: ^thread.Thread) {
    // with the help of context we get what thread we are using.
    fmt.printf("work of t%d \n", arr[t.user_index])
}

output -

work of t3 
work of t2 
work of t1 
Thread 3 is done 
Thread 1 is done 
Thread 2 is done

Handling deadlock using mutex

To handle deadlock we simply need to use lock our data using sync.Mutex and unlock after we are done using

myMutex :sync.Mutex

worker :: proc(t: ^thread.Thread) {
    /*
     when we come to this line
     it will check if myMutex is available or not
     if it isn't then it will wait for it
     and once its available then it will pass and run below code
    */
    sync.lock(myMutex) // locking before we do our opeartion

    fmt.printf("work of t%d started \n", arr[t.user_index])
    time.sleep(time.Second)
    fmt.printf("work of t%d completed \n", arr[t.user_index])
    fmt.println()

    sync.unlock(myMutex) // unlocking for other threads to use it
}

(Just replace worker function from above program with this)

Output -

work of t1 started 
// waits for a sec
work of t1 completed 

work of t2 started 
// waits for a sec
work of t2 completed 

work of t3 started 
// waits for a sec
work of t3 completed

Communicating between threads using channels

package main

import "core:fmt"
import "core:thread"
import "core:time"
import "core:sync"
import "core:sync/chan"

my_mutex :sync.Mutex

wData :: struct { 
    waitgroupdata: ^sync.Wait_Group,
    // message_channel is like a cannal/converbelt 
    // through which we can pass things
    message_channel: ^chan.Chan(int)
}

main :: proc() {
    wg : sync.Wait_Group
    // creating a new channel which can pass int type data
    mchan, err := chan.create(chan.Chan(int), context.allocator)

    t1 := thread.create(worker1)
    t2 := thread.create(worker2)

    t1.init_context = context
    t1.user_index = 1

    t2.init_context = context
    t2.user_index = 2

    t1.data = &wData{ waitgroupdata = &wg, message_channel = &mchan }
    t2.data = &wData{ waitgroupdata = &wg, message_channel = &mchan }

    sync.wait_group_add(&wg, 2)
    thread.start(t1) 
    thread.start(t2) 
    sync.wait_group_wait(&wg)
}

worker1 :: proc(t: ^thread.Thread) {
    fmt.printf("work of t1 started \n")
    dereferenced_value := (cast(^wData)t.data)

    // passing 24 through our channel
    // now this 24 can be picked up by other threads using this channel 
    ok := chan.send(dereferenced_value.message_channel^, 24)
    if ok {
        fmt.println("t1 sent message via chan")
    }else{
        fmt.println("couldn't send message")
    }

    sync.wait_group_done(dereferenced_value.waitgroupdata)
}

worker2 :: proc (t: ^thread.Thread){
    fmt.printf("work of t2 started \n")
    dereferenced_value := (cast(^wData)t.data)

    // This will wait here till we recieve any data through channel
    // once we get data from channel we can move forward
    data, ok := chan.recv(dereferenced_value.message_channel^)
    if ok { 
        fmt.printf("Recieved %d from t1\n", data)
    } else{
        fmt.printf("Something went wrong\n")
    }

    sync.wait_group_done(dereferenced_value.waitgroupdata)
}

output -

work of t2 started 
work of t1 started 
Recieved 24 from t1
t1 sent message via chan

Creating a ThreadPool using thread.Pool

main :: proc() {
    wg : sync.Wait_Group

    threadPool :thread.Pool
    thread.pool_init(&threadPool, context.allocator, 5)
    thread.pool_start(&threadPool)
    defer thread.pool_destroy(&threadPool)

    client_arena :virtual.Arena
    arena_allocator_error := virtual.arena_init_growing(&client_arena, 1 * mem.Byte)
    client_allocator := virtual.arena_allocator(&client_arena)
    for i := 0; i < 5; i+=1{
        thread.pool_add_task(&threadPool, client_allocator, worker, &{}, i)
    }
    thread.pool_finish(&threadPool)
    fmt.println("End")
}

worker :: proc (t: thread.Task){
    fmt.printf("working on thread %d \n", t.user_index)
}
10
Subscribe to my newsletter

Read articles from Dhadve Yash directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Dhadve Yash
Dhadve Yash