Practical Examples of C++ Concurrency
Table of contents
- std::mutex:
- std::timed_mutex
- std::recursive_mutex
- std::recursive_timed_mutex:
- std::latch:
- std::barrier
- std::atomic:
- std::thread:
- std::jthread:
- std::stop_token, std::stop_source, std::stop_callback:
- std::counting_semaphore:
- std::binary_semaphore:
- std::atomic_flag:
- Scoped lock:
- Shared lock
- Lock guard
- std::once_flag and std::call_once:
- Conclusion
For more insights and resources on programming and game development, visit my website at maxcomperatore.com.
std::mutex:
A std::mutex
is like a bathroom door with a lock. When a thread tries to access a resource protected by the mutex, it locks the door behind it. This prevents other threads from entering until the current occupant finishes and unlocks the door. When a thread finishes using the shared resource, it unlocks the mutex, allowing other threads to access the resource.
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx;
void bathroom(){
// One enters the bathroom, the other waits. Like 2 friends rushing towards a bathrrom
std::lock_guard<std::mutex> lock(mtx);
std::cout << "Using the bathroom" << std::endl;
// Doing things...
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Leaving the bathroom" << std::endl;
// The other one can enter now
}
int main(){
std::thread t1(bathroom);
std::thread t2(bathroom);
t1.join();
t2.join();
return 0;
}
std::timed_mutex
std::timed_mutex
is like a bathroom where you can wait for a specific time before giving up. If the mutex is not acquired within the given time, the thread abandons the attempt and continues.
#include <iostream>
#include <thread>
#include <mutex>
std::timed_mutex mtx;
void bathroom(){
// the second thread will wait for 3 seconds to use the bathroom, otherwise it will look for another bathroom.
if(mtx.try_lock_for(std::chrono::seconds(3))){
std::cout << "Using bathroom" << std::endl;
// thread using the bathroom is less time than the other thread is willing to wait,
// so both threads will be able to go to the toilet, one after the other
std::this_thread::sleep_for(std::chrono::seconds(2));
// leaving toilet
mtx.unlock();
}else{
// if no else, thread dies. like dying waiting for bathroom, very common
std::cout << "Bathroom is occupied, imma look for other bathroom because i waited too long (3 seconds)" << std::endl;
}
}
int main(){
std::thread t1(bathroom);
std::thread t2(bathroom);
t1.join();
t2.join();
return 0;
}
std::recursive_mutex
std::recursive_mutex
is like a bathroom with multiple stalls, allowing a thread to acquire the mutex multiple times. This is useful for recursive functions. Ensure that you unlock as many times as you lock to avoid deadlocks.
#include <iostream>
#include <thread>
#include <mutex>
const int NUM_STALLS = 3;
std::recursive_mutex bathroom_mutex;
void enterBathroom(int personId) { std::lock_guard<std::recursive_mutex> guard(bathroom_mutex); std::cout << "Person " << personId << " enters the bathroom.";
// Simulate using multiple stalls by acquiring the mutex multiple times. Other threads will be blocked until the mutex is released. for (int i = 0; i < 2; ++i) { // Acquire the mutex for each stall used, currently the same thread can acquire the mutex multiple times. // locking again. bathroom_mutex.lock(); std::cout << "Person " << personId << " is using stall " << i+1 << "."; }
// Simulate using the stalls for some time std::this_thread::sleep_for(std::chrono::seconds(3));
// Release the mutex for each stall used for (int i = 0; i < 2; ++i) { // we have to unlock for each stale or the other threads will be blocked because of this stale mutex. bathroom_mutex.unlock(); }
std::cout << "Person " << personId << " exits the bathroom.";} // The lock_guard is destroyed here, releasing the mutex.
int main() { // Create threads representing people entering the bathroom std::thread people[NUM_STALLS]; for (int i = 0; i < NUM_STALLS; ++i) { people[i] = std::thread(enterBathroom, i + 1); }
// Join threads to wait for them to finish for (int i = 0; i < NUM_STALLS; ++i) { people[i].join(); }
// Output: //Person 1 enters the bathroom. //Person 1 is using stall 1. //Person 1 is using stall 2. //Person 1 exits the bathroom. //Person 3 enters the bathroom. *1 left* //Person 3 is using stall 1. //Person 3 is using stall 2. //Person 3 exits the bathroom. //Person 2 enters the bathroom. *3 left* //Person 2 is using stall 1. //Person 2 is using stall 2. //Person 2 exits the bathroom.
return 0;}
std::recursive_timed_mutex:
std::recursive_timed_mutex
combines the features of std::recursive_mutex
with a timeout mechanism. It allows recursive locking but will abandon the attempt if the lock is not acquired within the specified time.
std::latch:
std::latch
acts as a synchronization barrier that waits for a specified number of threads to reach a certain point before proceeding.
#include <iostream>
#include <thread>
#include <latch>
void participant(const int id) {
// We need three participants to start the race (to unlock the latch)
std::latch startingLine(3);
std::cout << "Participant " << id << " has arrived at the starting line.\n";
// Decrement the count of the latch
startingLine.count_down();
// Wait until all participants have arrived (until the count reaches 0)
startingLine.wait();
// This is equivalent to the two lines above but combined
// startingLine.arrive_and_wait();
// All participants start at the same time thanks to the latch.
std::cout << "Participant " << id << " starts the race!\n";
}
int main() {
std::jthread t1(::participant, 1);
std::jthread t2(::participant, 2);
std::jthread t3(::participant, 3);
return 0;
}
All threads will wait here until EVERY thread is in the latch, it's like a checkpoint that ensures everyone is at the same place!
std::barrier
std::barrier
is a reusable synchronization primitive that allows multiple threads to synchronize at specific points in their execution. Unlike std::latch
, a std::barrier
can be reset and reused.
#include <iostream>
#include <thread>
#include <barrier>
const int NUM_THREADS = 3;
std::barrier barrier(::NUM_THREADS);
void worker(const int id) {
std::cout << "Worker " << id << " started\n";
std::this_thread::sleep_for(std::chrono::seconds(id));
std::cout << "Worker " << id << " finished work and waiting at the barrier\n";
// Wait at the barrier
::barrier.arrive_and_wait();
std::cout << "Worker " << id << " passed the barrier and continued\n";
// another barrier spawns!!!!!!!!!!!!! (we reuse the previous one)
::barrier.reset(); // 0 -> 3
std::cout << "Barrier reset for reuse\n";
// Wait at the barrier
::barrier.arrive_and_wait();
// ...
}
int main() {
std::jthread t1(::worker, 1);
std::jthread t2(::worker, 2);
std::jthread t3(::worker, 3);
return 0;
}
Barriers make all threads wait until the capacity of the barrier is met, then the barrier open, but we can use it later down the code!
std::atomic:
std::atomic
provides a way to perform thread-safe operations on variables without requiring locks. It ensures that operations are performed atomically, avoiding data races.
#include <iostream>
#include <thread>
#include <atomic>
std::atomic<int> counter(0);
void incrementCounter() {
// can handle writes and reads from multiple threads, without an atomic this would return a wrong, garbage value
for (int i = 0; i < 10000; ++i) {
// atomically adding 1, same as +=.
counter.fetch_add(1, std::memory_order::memory_order_relaxed);
}
}
int main() {
std::thread t1(incrementCounter);
std::thread t2(incrementCounter);
t1.join();
t2.join();
std::cout << "Final value of counter: " << counter << std::endl; // 20,000, clean.
return 0;
}
std::thread:
A std::thread is a fundamental building block of concurrent programming in C++. It represents a separate execution context that can run concurrently with other threads in a program. You can create a thread by passing it a function to execute, and once that function finishes executing, the thread will automatically be terminated.
However, it's crucial to manage the lifecycle of std::thread objects properly. When you create a std::thread, you should ensure that you join it or detach it before it goes out of scope. Failing to do so can lead to resource leaks or undefined behavior. Typically, you use std::join to wait for a thread to finish its execution before proceeding with the rest of the program. However, this is the old approach...
std::jthread:
std::jthread
is a modern alternative to std::thread
introduced in C++20. It automatically joins when it goes out of scope, simplifying thread management and reducing the risk of resource leaks.
std::stop_token, std::stop_source, std::stop_callback:
These are used for collaborative thread stopping. A std::stop_token
allows a thread to check if a stop request has been made, while a std::stop_source
is used to request the stop. std::stop_callback
provides a way to register callbacks to be executed when a stop request is made.
#include <chrono>
#include <iostream>
#include <thread>
int main() {
std::stop_source stopSrc;
std::stop_callback cb(stopSrc.get_token(),
[]() { std::cout << "Callback called!\n"; });
// We do some work in the lambda and pass a stop_token to it.
std::jthread jt0([](const std::stop_token& tk) {
for (int i = 0; i < 1'000'000'000; ++i) {
std::cout << "Printing value: " << i << '\n';
// If stopSrc.request_stop() is called, the token is stopped.
if (tk.stop_requested()) {
std::cout << "Thread stopped!\n";
return;
}
}
});
// Sleep the main thread for 2 seconds, so that the jthread can do some work.
std::this_thread::sleep_for(std::chrono::seconds(2));
// Requesting the associated token (and thread(s)) to stop.
stopSrc.request_stop();
std::cout << "Request to stop thread!\n";
return 0;
}
std::counting_semaphore:
A std::semaphore is a synchronization primitive, like a bathroom with limited capacity. The parameter <x
\> represents the maximum capacity of the semaphore, indicating the maximum number of entities (threads, for example) that can access a shared resource simultaneously.
The parameter (y
) specifies the number of entities that can enter the semaphore at a time, known as the semaphore's "count." Basically a more complete mutex with notifications, and more capacity.
x
and y
are normally the same value, because it would not make a lot of sense having a house where 3 can enter but only one at a time can do, or does it makes sense?
For instance, a binary semaphore (std::semaphore<1>) allows only one entity to access the resource at a time, mimicking the behavior of a single-stall bathroom where occupancy is restricted to one person (binary semaphore).
On the other hand, a semaphore with a capacity greater than one (e.g., std::semaphore<2>) permits multiple entities to access the resource simultaneously.
#include <iostream>
#include <thread>
#include <semaphore>
// Semaphore with a maximum count of 3, capacity of 3.
std::counting_semaphore<3> semaphore(3);
void accessResource(const int id) {
// Acquire a permit from the semaphore
::semaphore.acquire();
std::cout << "Thread " << id << " has the permit.\n";
// Simulate accessing the resource
std::this_thread::sleep_for(std::chrono::seconds(5));
// Release the permit back to the semaphore
::semaphore.release();
std::cout << "Thread " << id << " has released the permit.\n";
}
int main() {
std::jthread t1(::accessResource, 1);
std::jthread t2(::accessResource, 2);
std::jthread t3(::accessResource, 3);
// Fourth thread trying to access the resource
// Will be blocked until one of the first three threads releases the permit
std::jthread t4(::accessResource, 4);
// Output:
//Thread 2 has the permit.
//Thread 3 has the permit.
//Thread 1 has the permit.
//1 has released the permit.
//2 has released the permit.
//3 has released the permit.
//Thread 4 has the permit.
//Thread 4 has released the permit.
return 0;
}
std::binary_semaphore:
std::binary_semaphore is a typedef introduced for std::semaphore<1>. It is a synchronization primitive similar to a semaphore but with a capacity of 1. *In other words, it allows only one entity (such as a thread) to access a shared resource at a time.
*
Similarly to a mutex, a binary semaphore can be used to protect critical sections of code or shared resources from concurrent access. However, compared to a mutex, a binary semaphore provides additional functionality such as signaling and waiting, making it suitable for more complex synchronization scenarios.
For example, in a producer-consumer scenario, a binary semaphore can be used to control access to a shared buffer. The producer signals the semaphore when it adds data to the buffer, and the consumer waits for the semaphore to be signaled before accessing the buffer. This ensures that the producer and consumer do not access the buffer simultaneously.
#include <iostream>
#include <thread>
#include <semaphore>
#define SIMUL_WORK std::this_thread::sleep_for(std::chrono::milliseconds(1000))
// An empty bathroom, ready for 1 to enter and produce.
std::binary_semaphore producer{ 1 };
// A full bathroom (0 can enter), when 1 releases, 1 can enter.
// Someone must produce first, and release the bathroom for the consumer.
std::binary_semaphore consumer{ 0 };
[[noreturn]] void produce() {
for (int i = 0; i < 5; ++i) {
// We enter the producer bathroom, no one can enter.
::producer.acquire();
SIMUL_WORK;
std::cout << "Produced\n";
// We leave the consumer bathroom, so 1 can enter.
::consumer.release();
}
}
[[noreturn]] void consume() {
for (int i = 0; i < 5; ++i) {
// We enter the consumer bathroom, no one can enter.
::consumer.acquire();
// Simulate consumption time
SIMUL_WORK;
std::cout << "Consumed\n";
// We leave the producer bathroom, so 1 can enter.
::producer.release();
}
}
int main() {
std::jthread t1(::produce);
std::jthread t2(::consume);
// We loop forever, so we can see the threads working.
// One thread enters the bathroom, produces, leaves the other:
// letting the other enter the other bathroom and consume.
// When consumed, it leaves the other bathroom,
// letting the other enter the other bathroom and produce...
// We could sprinkle some mutexes to make sure the threads are
// not stepping on each other, but we don't need to here.
}
std::atomic_flag:
std::atomic_flag provides atomic operations on a boolean value. Unlike std::atomic<bool>, std::atomic_flag has less functionality but is more lightweight and suitable for simple atomic operations.
One of the primary uses of std::atomic_flag is for implementing spin locks, where threads repeatedly check the state of the flag until it changes. This makes std::atomic_flag ideal for scenarios where you need lightweight synchronization without the overhead of a mutex or a condition variable.
It's worth noting that for more complex synchronization scenarios or when additional functionality is required, std::condition_variable may be a better choice. std::condition_variable allows threads to wait efficiently for a condition to become true, providing more flexibility and options for signaling between threads.
#include <iostream>
#include <atomic>
#include <thread>
void critical_section(const int id) {
static std::atomic_flag lock = ATOMIC_FLAG_INIT;
static unsigned int spin_count = 1;
// this writes despite being the same value, cache miss +60%, use load.
while (lock.test_and_set(std::memory_order_acquire)) // no read/ write operation can be reordered before this operation. ensures consistency
{
std::this_thread::sleep_for(std::chrono::milliseconds(spin_count));
//incremental backoff
// though unif_dist(1,1024) (random backoff) performs better in cache misses
spin_count <<= 1;
lock.clear(std::memory_order_release); // no read/ write operation can be reordered after this operation. ensures consistency
//ensures that modifications made to the shared resource are visible to other threads after the lock is released.
}
std::cout << "Thread " << id << " entered critical section\n";
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "Thread " << id << " exited critical section\n";
lock.clear(std::memory_order_release);
}
int main() {
for (int i = 0; i < 5; ++i) {
std::jthread(::critical_section, i);
}
}
Scoped lock:
Deadlock avoiding. Can lock multiple locks at multiple times and unlocks in the end of block. Uses std::lock to atomically lock the threads to avoid deadlocks. It's a lightweight alternative basically equal to lock guard but can lock multiple threads at once.
#include <iostream>
#include <thread>
#include <mutex>
std::mutex g_mutex1;
std::mutex g_mutex2;
void incrementCounter(int i) {
// will lock onto the two mutexes, look at syntax! ITS A WRAPPER!
std::scoped_lock lock(g_mutex1, g_mutex2);
// the first will enter, the second will wait
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "Thread ID " << i << " is running\n";
} // scoped lock will unlock the mutexes
int main() {
std::thread t1(incrementCounter, 0);
std::thread t2(incrementCounter, 1);
t1.join();
t2.join();
return 0;
}
Shared lock
A lock for sharing. Multiple readers can access, but only 1 writer, at a time. We either exclusively lock for 1 writer thread, or if there's no writers, we can share the thread across multiple readers.
For example, we got 6 readers, writers wait.
When writer is writing, no readers can share the lock, because writer exclusively holds onto the lock.
Either one of writers or readers at a time. This come in another flavor called shared timed mutex that adds the possibility to wait for certain time before forgetting the lock and continuing, and if there's no more to do it dies. For example, a reader could be willing to wait for 4 seconds, after that it leaves and works until the function dies.
There's no reader/writers limit. A shared lock inside has a unique lock for the writer and other shared lock for the readers internally.
#include <iostream>
#include <thread>
#include <mutex>
#include <shared_mutex>
std::shared_mutex rw_mutex;
int shared_data = 0;
void reader(int id) {
while (true) {
// Lock shared access (multiple readers allowed)
std::shared_lock<std::shared_mutex> lock(rw_mutex);
std::cout << "Reader " << id << " read shared data: " << shared_data;
// Simulate reading
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void writer(int id) {
while (true) {
// Lock exclusive access (only one writer allowed). Writer cannot enter if there's readers and vice versa.
std::unique_lock<std::shared_mutex> lock(rw_mutex);
// Increment shared data
shared_data++;
std::cout << "Writer " << id << " incremented shared data to: " << shared_data;
// Simulate writing
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
int main() {
// Create reader threads
std::thread readers[6];
for (int i = 0; i < 6; ++i) {
readers[i] = std::thread(reader, i);
}
// Create writer thread
std::thread writerThread(writer, 1);
// Join threads
for (int i = 0; i < 6; ++i) {
readers[i].join();
}
writerThread.join();
return 0;
}
Lock guard
Lock guard is a lightweight alternative to unique_lock in C++. It is particularly useful when you need to lock a scope and don't intend to manually assign or release the lock. Unlike unique_lock, which offers more flexibility and options, lock_guard provides a simpler interface with less overhead. It automatically locks the associated mutex upon construction and releases it upon destruction. Lock guard is commonly used in straightforward scenarios where manual lock management is unnecessary, helping to keep code concise and efficient, like the majority of the code samples here. Useful for simple multithreaded code. Note that you can't unlock the lock guard manually.
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx;
void printNumbers(int id) {
// Lock ends at the end of the scope, no need to unlock or assign,
// So we use this cheap wrapper instead of unique_lock
std::lock_guard<std::mutex> lock(mtx);
// Critical section
for (int i = 1; i <= 5; ++i) {
std::cout << "Thread " << id << " prints: " << i << '\n';
} // lock is released here automatically
}
int main() {
std::thread t1(::printNumbers, 1);
std::thread t2(::printNumbers, 2);
t1.join();
t2.join();
// Output:
// Thread 1 prints: 1
// Thread 1 prints: 2
// Thread 1 prints: 3
// Thread 1 prints: 4
// Thread 1 prints: 5
// Thread 2 prints: 1
// Thread 2 prints: 2
// Thread 2 prints: 3
// Thread 2 prints: 4
// Thread 2 prints: 5
}
std::once_flag and std::call_once:
These are useful functions to ensure that a specific function is executed only once, regardless of how many times it is called from different threads or contexts.
std::once_flag serves as a synchronization flag to coordinate the execution of a function across multiple threads. It ensures that the function associated with it is called exactly once, even in the presence of concurrent access.
std::call_once is the function used in conjunction with std::once_flag to achieve this behavior. It takes a reference to a std::once_flag and a callable object (usually a function or a lambda expression) as arguments. The first time std::call_once is called with a particular std::once_flag, it executes the associated function, and subsequent calls to std::call_once with the same std::once_flag are ignored.
There's no way to reset a once flag. One use only.
#include <iostream>
#include <mutex>
std::once_flag flag;
void do_once() {
std::cout << "Called only once!\n";
}
int main() {
while (true) {
// This function will be called only once, rest of calls will be ignored. //no stack overflow std::call_once(::flag, ::do_once);
}
}
Conclusion
Today, we delved on some of the fundamentals of concurrency in C++, we saw threads, atomics, barriers, latches, locks and all in between. These are fundamental for complex software and games where we need to leverage threading in performance critical applications. The quiz awaits!
Quiz
A. What is the purpose of a std::recursive_mutex?
A) To synchronize access to shared resources across multiple threads.
B) To ensure only one thread can access a critical section at a time.
C) To allow a thread to lock the same mutex multiple times without causing a deadlock.
D) To create a one-time barrier for synchronizing multiple threads.
B. What is the main advantage of using std::jthread over std::thread?
A) std::jthread is more lightweight than std::thread.
B) std::jthread allows for better control over thread priority.
C) std::jthread provides better error handling for thread creation.
D) std::jthread automatically joins the thread when it goes out of scope.
C. What is the purpose of a std::stop_token?
A) To provide a mechanism for threads to stop execution gracefully.
B) To synchronize access to critical sections of code.
C) To prevent other threads from accessing shared resources.
D) To control the execution order of threads.
D. What is the difference between a std::latch and a std::barrier?
A) std::barrier can synchronize an arbitrary number of threads, while std::latch is limited to a fixed number.
B) std::latch is used for thread signaling, while std::barrier is used for mutual exclusion.
C) std::barrier allows threads to proceed once a certain number of threads have reached a point in the code, while std::latch blocks until all threads have reached a point.
D) std::latch is reusable, while std::barrier is one-time use only.
E. What is the purpose of std::atomic?
A) To provide a mechanism for thread creation.
B) To create atomic operations on fundamental data types.
C) To synchronize access to shared resources.
D) To implement locking mechanisms.
F. What is the purpose of a std::semaphore in C++ concurrency?
A) To provide a mechanism for threads to stop execution gracefully.
B) To create atomic operations on fundamental data types.
C) To synchronize access to critical sections of code.
D) To create a one-time barrier for synchronizing multiple threads.
Answers
A. C
B. D
C. A
D. D
E. B
F. C
Related Resources
Thank you for following along with this tutorial on thread synchronization in C++. I hope you found the information valuable and applicable to your projects. Happy coding!
Subscribe to my newsletter
Read articles from Max Comperatore directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by