Multi Threading in odin lang
Dhadve Yash
5 min read
In this blog we will take a look at how to use threads in odin lang. Threads are pretty important and advanced topic so understanding them initially can be hard. Odin handles threads similar to how golang handles it.
Creating threads
package main
import "core:fmt"
import "core:thread"
import "core:sync"
myMutex :sync.Mutex
// A structure for the data that we are gonna pass in our multithreaded function
workerData :: struct {
// waitGroup is passed so that when our process is done,
// we can tell our main thread via this
waitgroupdata: ^sync.Wait_Group
}
main :: proc() {
// initilizing a wait group which will keep track of our threads
// it is used to check if our thread has completed its work or not
wg : sync.Wait_Group
// creating a new thread
t1 := thread.create(worker)
t1.init_context = context
t1.user_index = 1
t1.data = &workerData{ waitgroupdata = &wg}
t2 := thread.create(worker)
t2.init_context = context
t2.user_index = 1
t2.data = &workerData{ waitgroupdata = &wg}
// this function takes in counter of how many threads are going to start
// In our case we are just spinning up 2 thread
sync.wait_group_add(&wg, 2)
// starting our thread
thread.start(t1)
thread.start(t2)
/*
waiting for our threads to complete their work
this function checks if our counter value is greater than 0
if counter is greater than 0 then that means our threads are still working
and when it becomes 0 then all threads are done with there work and we can down
Internally it looks something like this
for wg.counter != 0 { }
*/
sync.wait_group_wait(&wg)
}
worker :: proc(t: ^thread.Thread) {
fmt.printf("work of thread %d started \n", t.user_index)
// getting values that we passed while initilizing our thread
dereferenced_value := (cast(^workerData)t.data)
fmt.println("work of thread %d done", t.user_index)
// this function just does counter--
// which tells that our function has done its work
sync.wait_group_done(dereferenced_value.waitgroupdata)
}
Output -
work of thread 1 started
work of thread 2 started
work of thread 2 done
work of thread 1 done
Now lets try creating many threads
package main
import "core:fmt"
import "core:thread"
import "core:time"
arr := []int{1,2,3}
main :: proc() {
// creating a dynamic array of len of array which stores pointer to a thread
threadPool := make([dynamic]^thread.Thread, 0, len(arr))
defer delete(threadPool)
for i in 0..<len(arr) {
thr := thread.create(worker) // creating a thread
if thr != nil {
// setting up context for our thread
thr.init_context = context
// giving our thread id
thr.user_index = i
// Adding our thread to thread pool
append(&threadPool, thr)
thread.start(thr) // running our thread
}
}
// looping till all of our threads have done their work
// i.e till our threadPool is empty
// our threadPool becomes empty because after each
for len(threadPool) > 0 {
for i := 0; i < len(threadPool); {
// Getting a threads address at index i
t := threadPool[i]
if thread.is_done(t) {
fmt.printf("Thread %d is done \n", arr[t.user_index])
thread.destroy(t)
// removing address of destroyed thread from out pool
ordered_remove(&threadPool, i)
} else {
// If current thread process is not done then go to next one
i += 1
}
}
}
}
worker :: proc(t: ^thread.Thread) {
// with the help of context we get what thread we are using.
fmt.printf("work of t%d \n", arr[t.user_index])
}
output -
work of t3
work of t2
work of t1
Thread 3 is done
Thread 1 is done
Thread 2 is done
Handling deadlock using mutex
To handle deadlock we simply need to use lock our data using sync.Mutex and unlock after we are done using
myMutex :sync.Mutex
worker :: proc(t: ^thread.Thread) {
/*
when we come to this line
it will check if myMutex is available or not
if it isn't then it will wait for it
and once its available then it will pass and run below code
*/
sync.lock(myMutex) // locking before we do our opeartion
fmt.printf("work of t%d started \n", arr[t.user_index])
time.sleep(time.Second)
fmt.printf("work of t%d completed \n", arr[t.user_index])
fmt.println()
sync.unlock(myMutex) // unlocking for other threads to use it
}
(Just replace worker function from above program with this)
Output -
work of t1 started
// waits for a sec
work of t1 completed
work of t2 started
// waits for a sec
work of t2 completed
work of t3 started
// waits for a sec
work of t3 completed
Communicating between threads using channels
package main
import "core:fmt"
import "core:thread"
import "core:time"
import "core:sync"
import "core:sync/chan"
my_mutex :sync.Mutex
wData :: struct {
waitgroupdata: ^sync.Wait_Group,
// message_channel is like a cannal/converbelt
// through which we can pass things
message_channel: ^chan.Chan(int)
}
main :: proc() {
wg : sync.Wait_Group
// creating a new channel which can pass int type data
mchan, err := chan.create(chan.Chan(int), context.allocator)
t1 := thread.create(worker1)
t2 := thread.create(worker2)
t1.init_context = context
t1.user_index = 1
t2.init_context = context
t2.user_index = 2
t1.data = &wData{ waitgroupdata = &wg, message_channel = &mchan }
t2.data = &wData{ waitgroupdata = &wg, message_channel = &mchan }
sync.wait_group_add(&wg, 2)
thread.start(t1)
thread.start(t2)
sync.wait_group_wait(&wg)
}
worker1 :: proc(t: ^thread.Thread) {
fmt.printf("work of t1 started \n")
dereferenced_value := (cast(^wData)t.data)
// passing 24 through our channel
// now this 24 can be picked up by other threads using this channel
ok := chan.send(dereferenced_value.message_channel^, 24)
if ok {
fmt.println("t1 sent message via chan")
}else{
fmt.println("couldn't send message")
}
sync.wait_group_done(dereferenced_value.waitgroupdata)
}
worker2 :: proc (t: ^thread.Thread){
fmt.printf("work of t2 started \n")
dereferenced_value := (cast(^wData)t.data)
// This will wait here till we recieve any data through channel
// once we get data from channel we can move forward
data, ok := chan.recv(dereferenced_value.message_channel^)
if ok {
fmt.printf("Recieved %d from t1\n", data)
} else{
fmt.printf("Something went wrong\n")
}
sync.wait_group_done(dereferenced_value.waitgroupdata)
}
output -
work of t2 started
work of t1 started
Recieved 24 from t1
t1 sent message via chan
Creating a ThreadPool using thread.Pool
main :: proc() {
wg : sync.Wait_Group
threadPool :thread.Pool
thread.pool_init(&threadPool, context.allocator, 5)
thread.pool_start(&threadPool)
defer thread.pool_destroy(&threadPool)
client_arena :virtual.Arena
arena_allocator_error := virtual.arena_init_growing(&client_arena, 1 * mem.Byte)
client_allocator := virtual.arena_allocator(&client_arena)
for i := 0; i < 5; i+=1{
thread.pool_add_task(&threadPool, client_allocator, worker, &{}, i)
}
thread.pool_finish(&threadPool)
fmt.println("End")
}
worker :: proc (t: thread.Task){
fmt.printf("working on thread %d \n", t.user_index)
}
10
Subscribe to my newsletter
Read articles from Dhadve Yash directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by