Producer-Consumer Pattern


The producer-consumer pattern is a fundamental synchronization pattern in multithreaded programming. In this article, we'll dive deep into implementing this pattern in C++ using low-level synchronization primitives - specifically mutex and condition variables. Rather than relying on pre-built thread-safe queues, we'll build our solution from the ground up to understand how these mechanisms work.
What is the Producer-Consumer Pattern?
The producer-consumer pattern involves two types of entities:
Producers: Threads that generate data or tasks
Consumers: Threads that process the data or tasks generated by producers
These entities communicate through a shared buffer or queue. The pattern allows for decoupling data production from its consumption, enabling parallel processing and improved throughput.
Real life analgy
a restaurant kitchen and dining area.
Producers (Chefs in the Kitchen): The chefs prepare food (data) and place it on a counter or serving area (a shared buffer). They keep cooking as long as there’s space on the counter.
Consumers (Waiters or Customers): The waiters pick up the prepared food from the counter and serve it to customers, or customers take the food directly (consuming data). They can only take food if there’s something available on the counter.
Shared Buffer (Counter): The counter has limited space, so chefs can only add food if there’s room, and waiters can only take food if there’s something there. If the counter is full, chefs wait (producer blocked). If it’s empty, waiters wait (consumer blocked).
Synchronization: The kitchen staff and waiters coordinate to avoid conflicts—like ensuring chefs don’t overfill the counter or waiters don’t try to grab food that isn’t there. This is similar to using locks or semaphores in programming to manage access to the shared buffer.
What Problems Does It Solve?
The producer-consumer pattern addresses several critical challenges in concurrent programming:
Workload Distribution: Efficiently distributes tasks across multiple worker threads
Rate Mismatch Handling: Manages scenarios where producers and consumers work at different speeds
Resource Utilization: Maximizes CPU utilization by allowing parallel processing
Backpressure Mechanism: Provides natural throttling when production outpaces consumption
Decoupling: Separates the concerns of data generation and processing
Synchronization Challenges
Implementing this pattern correctly requires addressing several concurrency issues:
Race Conditions: Multiple threads accessing shared data simultaneously
Buffer Overflow: Producers overwhelming a bounded buffer
Buffer Underflow: Consumers attempting to process from an empty buffer
Deadlocks: Threads waiting indefinitely due to circular dependencies
Implementing the Producer-Consumer Pattern
Let's implement a complete producer-consumer solution using a bounded buffer:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
// Shared resources
std::queue<int> buffer; // Raw (unsafe) queue
std::mutex mtx; // Mutex to protect the queue
std::condition_variable producer_cv; // Condition variable for signaling
std::condition_variable consumer_cv; // we can use only one cv but two is safer
const int BUFFER_CAPACITY = 5; // Maximum buffer size
// Producer function
void producer(int num_items) {
for (int i = 0; i < num_items; ++i) {
std::unique_lock<std::mutex> lock(mtx);
// Wait if buffer is full (optional)
producer_cv.wait(lock, [&] { return buffer.size() < BUFFER_CAPACITY; });
buffer.push(i);
std::cout << "Produced: " << i << std::endl;
lock.unlock();
producer_cv.notify_one(); // Wake up one consumer
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
// Consumer function
void consumer(int num_items) {
for (int i = 0; i < num_items; ++i) {
std::unique_lock<std::mutex> lock(mtx);
// Wait if buffer is empty
consumer_cv.wait(lock, [&] { return !buffer.empty(); });
int item = buffer.front();
buffer.pop();
std::cout << "Consumed: " << item << std::endl;
lock.unlock();
consumer_cv.notify_one(); // Wake up one producer (if using buffer limits)
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
int main() {
const int NUM_ITEMS = 10;
std::thread prod(producer, NUM_ITEMS);
std::thread cons(consumer, NUM_ITEMS);
prod.join();
cons.join();
return 0;
}
Breaking Down the Implementation
1. BoundedBuffer Class
Our contains:
A queue for storing items
A capacity limit
A mutex for thread synchronization
Two condition variables:
producer_cv
: Signaled when the buffer is not full (producers wait on this)consumer_cv
: Signaled when the buffer is not empty (consumers wait on this)
2. Key Methods
The class provides three key methods:
produce(): Adds an item to the buffer, waiting if buffer is full
consume(): Removes an item from the buffer, waiting if buffer is empty
3. Condition Variable Logic
The most crucial part is how condition variables are used:
Producers wait on
producer_cv
until there's space or shutdown is requestedConsumers wait on
consumer_cv
until there are items or shutdown is requestedAfter producing, we notify one consumer
After consuming, we notify one producer
Visualizing the Producer-Consumer Pattern
Let's visualize the flow with a diagram:
┌───────────────┐ ┌──────────────┐ ┌───────────────┐
│ Producer 1 │ │ │ │ Consumer 1 │
└───────┬───────┘ │ │ └───────┬───────┘
│ │ │ │
│ produce() │ Bounded │ consume() │
▼ │ Buffer │ ▼
┌───────────────┐ │ │ ┌───────────────┐
│ Producer 2 ├────►│ [■■■□□□□□] ├────►│ Consumer 2 │
└───────┬───────┘ │ │ └───────┬───────┘
│ │ │ │
│ │ │ │
▼ │ │ ▼
┌───────────────┐ └──────────────┘ ┌───────────────┐
│ Producer 3 │ │ Consumer 3 │
└───────────────┘ └───────────────┘
│ │
│ │
▼ ▼
wait on not_full wait on not_empty
if buffer is full if buffer is empty
Using C++20 Features
C++20 introduces new synchronization primitives like std::counting_semaphore
and std::latch
that can simplify producer-consumer implementations:
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <semaphore>
#include <chrono>
#include <atomic>
constexpr int MAX_QUEUE_SIZE = 10;
class SemaphoreQueue {
public:
void push(int value) {
space_available.acquire(); // wait for space
{
std::scoped_lock lock(mutex_);
queue_.push(value);
}
items_available.release(); // signal item added
}
bool pop(int& value, std::atomic<bool>& running) {
if (!running.load()) return false;
items_available.acquire(); // wait for item
{
std::scoped_lock lock(mutex_);
if (!queue_.empty()) {
value = queue_.front();
queue_.pop();
} else {
return false;
}
}
space_available.release(); // signal space available
return true;
}
private:
std::queue<int> queue_;
std::mutex mutex_;
std::counting_semaphore<MAX_QUEUE_SIZE> items_available{0};
std::counting_semaphore<MAX_QUEUE_SIZE> space_available{MAX_QUEUE_SIZE};
};
void producer(SemaphoreQueue& q, std::atomic<bool>& running) {
int counter = 0;
while (running.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
q.push(counter);
std::cout << "[Producer] Produced: " << counter << '\n';
++counter;
}
}
void consumer(SemaphoreQueue& q, std::atomic<bool>& running) {
int value;
while (running.load() || !q_empty(q)) {
if (q.pop(value, running)) {
std::cout << "[Consumer] Consumed: " << value << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
}
}
// Helper to check if queue is empty (under lock)
bool q_empty(SemaphoreQueue& q) {
static std::mutex check_mutex;
std::scoped_lock lock(check_mutex);
return false; // not accurate for simplicity; could add a `size()` method
}
int main() {
SemaphoreQueue queue;
std::atomic<bool> running{true};
std::jthread prod_thread([&] { producer(queue, running); });
std::jthread cons_thread([&] { consumer(queue, running); });
std::this_thread::sleep_for(std::chrono::seconds(5));
running = false;
// Threads join automatically
return 0;
}
External Libraries for Producer-Consumer in C++
While we've focused on implementing the pattern from scratch, several libraries provide ready-to-use implementations:
Boost.Thread: Provides
boost::sync_bounded_queue
and other synchronization primitivesIntel Threading Building Blocks (TBB): Offers
tbb::concurrent_queue
and a rich task schedulerlibcds: A C++ library of concurrent data structures with several producer-consumer queue implementations
folly: Facebook's open-source library includes
folly::ProducerConsumerQueue
concurrentqueue: A fast multi-producer, multi-consumer lock-free concurrent queue by Cameron Desrochers
Real-World Applications
The producer-consumer pattern is used in:
Job Schedulers: Processing tasks in distributed systems
Media Processing: Audio/video encoding pipelines
Web Servers: Handling client requests with thread pools
Database Systems: Query processing and result buffers
Message Queues: Implementing pub-sub systems
Simulation Systems: Event processing in discrete event simulations
Performance Considerations
When implementing producer-consumer systems, consider:
Buffer Size: Too small causes excessive context switching, too large wastes memory
Contention: High producer or consumer counts can lead to lock contention
Granularity: Processing items in batches can improve throughput
Memory Allocation: Pre-allocating buffers can reduce overhead
Cache Considerations: Design data structures to minimize false sharing
Conclusion
The producer-consumer pattern using mutex and condition variables provides a powerful mechanism for coordinating work between threads. Understanding the underlying synchronization primitives gives you the flexibility to tailor implementations to specific requirements and constraints.
By implementing the pattern directly using mutex and condition variables, you gain insights into thread synchronization that using high-level abstractions might obscure. This knowledge is invaluable when debugging concurrency issues or optimizing performance in multithreaded applications.
Next time you need to distribute work among threads, consider how the producer-consumer pattern might help you build more efficient and robust concurrent systems.
Additional Resources
C++ Concurrency in Action by Anthony Williams
The Art of Multiprocessor Programming by Maurice Herlihy
Subscribe to my newsletter
Read articles from Abdullah Khairy directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
